Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async workflow with agentlist #631

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,28 @@ print(f"Generated data: {generated_data}")


```
## OpenAI Assistant Integration

Swarms provides native support for OpenAI's Assistants API through the
`OpenAIAssistant` class:

```python

from swarms import OpenAIAssistant

#Create an assistant
assistant = OpenAIAssistant(
name="Research Assistant",
instructions="You help with research tasks.",
tools=[{"type": "code_interpreter"}]
)

#Run tasks
response = assistant.run("Analyze this dataset")

```

See the [OpenAI Assistant documentation](docs/swarms/agents/openai_assistant.md) for more details.

### Multi Modal Autonomous Agent
Run the agent with multiple modalities useful for various real-world tasks in manufacturing, logistics, and health.
Expand Down Expand Up @@ -1830,7 +1852,7 @@ Documentation is located here at: [docs.swarms.world](https://docs.swarms.world)
-----

## Folder Structure
The swarms package has been meticlously crafted for extreme use-ability and understanding, the swarms package is split up into various modules such as `swarms.agents` that holds pre-built agents, `swarms.structs` that holds a vast array of structures like `Agent` and multi agent structures. The 3 most important are `structs`, `models`, and `agents`.
The swarms package has been meticlously crafted for extreme use-ability and understanding, the swarms package is split up into various modules such as `swarms.agents` that holds pre-built agents, `swarms.structs` that holds a vast array of structures like `Agent` and multi agent structures. The 3 most important are `structs`, `models`, and `agents`.

```sh
├── __init__.py
Expand Down
2 changes: 2 additions & 0 deletions swarms/structs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from swarms.structs.base_structure import BaseStructure
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.base_workflow import BaseWorkflow
from swarms.structs.async_workflow import AsyncWorkflow
from swarms.structs.concurrent_workflow import ConcurrentWorkflow
from swarms.structs.conversation import Conversation
from swarms.structs.graph_workflow import (
Expand Down Expand Up @@ -78,6 +79,7 @@

__all__ = [
"Agent",
"AsyncWorkflow",
"AutoSwarm",
"AutoSwarmRouter",
"BaseStructure",
Expand Down
62 changes: 62 additions & 0 deletions swarms/structs/async_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import asyncio
from typing import Any, Callable, List, Optional
from swarms.structs.base_workflow import BaseWorkflow
from swarms.structs.agent import Agent
from swarms.utils.loguru_logger import logger

class AsyncWorkflow(BaseWorkflow):
def __init__(
self,
name: str = "AsyncWorkflow",
agents: List[Agent] = None,
max_workers: int = 5,
dashboard: bool = False,
autosave: bool = False,
verbose: bool = False,
**kwargs
):
super().__init__(agents=agents, **kwargs)
self.name = name
self.agents = agents or []
self.max_workers = max_workers
self.dashboard = dashboard
self.autosave = autosave
self.verbose = verbose
self.task_pool = []
self.results = []
self.loop = None

async def _execute_agent_task(self, agent: Agent, task: str) -> Any:
"""Execute a single agent task asynchronously"""
try:
if self.verbose:
logger.info(f"Agent {agent.agent_name} processing task: {task}")
result = await agent.arun(task)
if self.verbose:
logger.info(f"Agent {agent.agent_name} completed task")
return result
except Exception as e:
logger.error(f"Error in agent {agent.agent_name}: {str(e)}")
return str(e)

async def run(self, task: str) -> List[Any]:
"""Run the workflow with all agents processing the task concurrently"""
if not self.agents:
raise ValueError("No agents provided to the workflow")

try:
# Create tasks for all agents
tasks = [self._execute_agent_task(agent, task) for agent in self.agents]

# Execute all tasks concurrently
self.results = await asyncio.gather(*tasks, return_exceptions=True)

if self.autosave:
# Implement autosave logic here
pass

return self.results

except Exception as e:
logger.error(f"Error in workflow execution: {str(e)}")
raise
58 changes: 58 additions & 0 deletions swarms/structs/swarm_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ def __init__(
flow: str = None,
return_json: bool = True,
auto_generate_prompts: bool = False,
output_files: Dict[str, str] = None, # New parameter
output_formats: List[str] = None, # New parameter
output_dir: str = "swarm_outputs", # New parameter
*args,
**kwargs,
):
Expand All @@ -94,6 +97,28 @@ def __init__(
self.auto_generate_prompts = auto_generate_prompts
self.logs = []

# Initialize output settings
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)

# Default output formats if none specified
self.output_formats = output_formats or ["json"]

# Custom output file paths or use defaults
self.output_files = output_files or {
fmt: self.output_dir / f"{self.name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{fmt}"
for fmt in self.output_formats
}

# Format handlers
self._format_handlers = {
"json": self._to_json,
"yaml": self._to_yaml,
"md": self._to_markdown,
"csv": self._to_csv,
"txt": self._to_text
}

self.reliability_check()

self._log(
Expand Down Expand Up @@ -287,6 +312,7 @@ def run(self, task: str, *args, **kwargs) -> Any:
task=task,
metadata={"result": str(result)},
)
self._save_outputs(result)
return result
except Exception as e:
self._log(
Expand All @@ -297,6 +323,38 @@ def run(self, task: str, *args, **kwargs) -> Any:
)
raise

def _to_json(self, data: Any) -> str:
return json.dumps(data, indent=2)

def _to_yaml(self, data: Any) -> str:
return yaml.dump(data)

def _to_markdown(self, data: Any) -> str:
md = f"# {self.name} Results\n\n"
for agent_name, result in data.items():
md += f"## {agent_name}\n\n{result}\n\n"
return md

def _to_csv(self, data: Any) -> str:
if isinstance(data, dict):
df = pd.DataFrame.from_dict(data, orient='index')
return df.to_csv()
return ""

def _to_text(self, data: Any) -> str:
return str(data)

def _save_outputs(self, results: Any):
"""Save results in all specified formats"""
for fmt, filepath in self.output_files.items():
if fmt in self._format_handlers:
try:
formatted_data = self._format_handlers[fmt](results)
with open(filepath, 'w') as f:
f.write(formatted_data)
except Exception as e:
logger.error(f"Failed to save {fmt} output: {str(e)}")

def batch_run(
self, tasks: List[str], *args, **kwargs
) -> List[Any]:
Expand Down
35 changes: 35 additions & 0 deletions tests/structs/test_async_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import pytest
import asyncio
from swarms import Agent, AsyncWorkflow
from swarm_models import OpenAIChat # or any other model you prefer

@pytest.mark.asyncio
async def test_async_workflow():
# Create test agents
model = OpenAIChat() # Initialize with appropriate parameters
agents = [
Agent(
agent_name=f"Test-Agent-{i}",
llm=model,
max_loops=1,
dashboard=False,
verbose=True,
)
for i in range(3)
]

# Initialize workflow
workflow = AsyncWorkflow(
name="Test-Async-Workflow",
agents=agents,
max_workers=3,
verbose=True
)

# Run test task
test_task = "What is 2+2?"
results = await workflow.run(test_task)

# Assertions
assert len(results) == len(agents)
assert all(isinstance(result, str) for result in results)