-
Notifications
You must be signed in to change notification settings - Fork 0
/
knowledge_bases.py
229 lines (193 loc) · 7.51 KB
/
knowledge_bases.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# knowledge_bases.py
from dataclasses import dataclass
import sqlite3
import json
@dataclass
class KnowledgeBaseConfig:
db_path: str
schema: dict
agent_type: str
class AgentKnowledgeBase:
def __init__(self, config: KnowledgeBaseConfig):
self.conn = sqlite3.connect(config.db_path)
self.agent_type = config.agent_type
self.init_schema(config.schema)
def init_schema(self, schema):
cursor = self.conn.cursor()
for table_name, columns in schema.items():
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {table_name} (
id INTEGER PRIMARY KEY,
{columns},
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
self.conn.commit()
# Implementation of specific knowledge bases
class MemoryKnowledgeBase(AgentKnowledgeBase):
def __init__(self):
super().__init__(KnowledgeBaseConfig(
db_path="memory_kb.sqlite",
schema={
"concepts": "name TEXT, importance FLOAT, usage_count INTEGER",
"patterns": "pattern TEXT, frequency INTEGER, last_seen TIMESTAMP",
"relationships": "source_id INTEGER, target_id INTEGER, strength FLOAT"
},
agent_type="memory"
))
class EvolutionKnowledgeBase(AgentKnowledgeBase):
def __init__(self):
super().__init__(KnowledgeBaseConfig(
db_path="evolution_kb.sqlite",
schema={
"fitness_metrics": "metric_name TEXT, weight FLOAT, adaptation_rate FLOAT",
"optimization_rules": "rule TEXT, priority INTEGER, success_rate FLOAT",
"evolution_history": "change TEXT, impact FLOAT, timestamp TIMESTAMP"
},
agent_type="evolution"
))
# communication.py
class MessageBroker:
def __init__(self):
self.queues = {}
self.subscribers = {}
self.feedback_loops = {}
async def publish(self, topic: str, message: dict):
if topic in self.subscribers:
for subscriber in self.subscribers[topic]:
await subscriber.process_message(message)
async def subscribe(self, topic: str, callback):
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(callback)
class FeedbackController:
def __init__(self):
self.feedback_history = []
self.learning_rate = 0.1
async def process_feedback(self, source_agent: str, target_agent: str, feedback: dict):
self.feedback_history.append({
"source": source_agent,
"target": target_agent,
"feedback": feedback,
"timestamp": datetime.now().isoformat()
})
# Analyze feedback patterns
patterns = self.analyze_feedback_patterns()
# Update agent behaviors based on feedback
await self.update_agent_behavior(target_agent, patterns)
# prompt_management.py
class PromptTemplate:
def __init__(self, agent_type: str):
self.agent_type = agent_type
self.templates = self.load_templates()
self.performance_metrics = {}
def load_templates(self) -> dict:
return {
"memory": {
"process_memory": """
Process this memory with the following considerations:
Context: {context}
Previous Memories: {previous_memories}
Tasks:
1. Analyze semantic content
2. Extract key relationships
3. Determine storage priority
4. Identify potential connections
Format response as JSON with reasoning.
""",
"consolidate_memory": """..."""
},
"evolution": {
"evaluate_fitness": """
Evaluate system fitness with these metrics:
Current State: {current_state}
Historical Performance: {history}
Consider:
1. Resource efficiency
2. Learning rate
3. Adaptation speed
4. Error reduction
Provide numerical scores and explanations.
""",
"optimize_system": """..."""
},
"supervisor": {
"monitor_system": """
Monitor system health:
Agent States: {agent_states}
Resource Usage: {resources}
Error Rates: {errors}
Tasks:
1. Identify bottlenecks
2. Suggest optimizations
3. Coordinate resources
4. Prioritize actions
Provide structured supervision plan.
""",
"coordinate_agents": """..."""
}
}
class PromptOptimizer:
def __init__(self):
self.performance_history = []
self.optimization_rules = {}
async def optimize_prompt(self, template: str, performance_metrics: dict) -> str:
# Analyze prompt performance
optimization_score = self.calculate_optimization_score(performance_metrics)
# Apply optimization rules
optimized_template = self.apply_optimizations(template, optimization_score)
return optimized_template
# Enhanced Agent Implementation
class EnhancedAgent(BaseAgent):
def __init__(self, config: AgentConfig, knowledge_base: AgentKnowledgeBase):
super().__init__(config)
self.knowledge_base = knowledge_base
self.prompt_template = PromptTemplate(config.agent_type)
self.message_broker = MessageBroker()
self.feedback_controller = FeedbackController()
async def process_message(self, message: dict):
# Process incoming message
prompt = self.prompt_template.get_prompt(
template_name=message["type"],
context=message["context"]
)
# Generate response
response = await self.generate_response(prompt)
# Store in knowledge base
await self.knowledge_base.store_interaction(message, response)
# Publish feedback
await self.message_broker.publish(
"feedback",
{
"source": self.config.agent_type,
"content": response,
"performance_metrics": self.calculate_performance(response)
}
)
return response
# Example usage
async def main():
# Initialize knowledge bases
memory_kb = MemoryKnowledgeBase()
evolution_kb = EvolutionKnowledgeBase()
# Initialize agents with knowledge bases
memory_agent = EnhancedAgent(
config=AgentConfig(...),
knowledge_base=memory_kb
)
evolution_agent = EnhancedAgent(
config=AgentConfig(...),
knowledge_base=evolution_kb
)
# Set up communication
message_broker = MessageBroker()
await message_broker.subscribe("memory_events", memory_agent.process_message)
await message_broker.subscribe("evolution_events", evolution_agent.process_message)
# Example interaction
test_message = {
"type": "process_memory",
"content": "New memory to process",
"context": {"previous_memories": [...]}
}
# Publish message and get response
await message_broker.publish("memory_events", test_message)