From bd0a89705f4bb726dc369d6001aa154387173fc5 Mon Sep 17 00:00:00 2001 From: Occupying-Mars Date: Sun, 10 Nov 2024 21:40:12 +0530 Subject: [PATCH 1/3] async_workkflow --- tests/structs/test_async_workflow.py | 35 ++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 tests/structs/test_async_workflow.py diff --git a/tests/structs/test_async_workflow.py b/tests/structs/test_async_workflow.py new file mode 100644 index 000000000..512fd0eb9 --- /dev/null +++ b/tests/structs/test_async_workflow.py @@ -0,0 +1,35 @@ +import pytest +import asyncio +from swarms import Agent, AsyncWorkflow +from swarm_models import OpenAIChat # or any other model you prefer + +@pytest.mark.asyncio +async def test_async_workflow(): + # Create test agents + model = OpenAIChat() # Initialize with appropriate parameters + agents = [ + Agent( + agent_name=f"Test-Agent-{i}", + llm=model, + max_loops=1, + dashboard=False, + verbose=True, + ) + for i in range(3) + ] + + # Initialize workflow + workflow = AsyncWorkflow( + name="Test-Async-Workflow", + agents=agents, + max_workers=3, + verbose=True + ) + + # Run test task + test_task = "What is 2+2?" + results = await workflow.run(test_task) + + # Assertions + assert len(results) == len(agents) + assert all(isinstance(result, str) for result in results) From 0ea39bb905ee64f11175728e2c5beae1941c01d5 Mon Sep 17 00:00:00 2001 From: Occupying-Mars Date: Wed, 13 Nov 2024 16:18:56 +0530 Subject: [PATCH 2/3] asyncworkflow with agent lists --- README.md | 24 ++++++++++++- swarms/structs/__init__.py | 2 ++ swarms/structs/async_workflow.py | 62 ++++++++++++++++++++++++++++++++ 3 files changed, 87 insertions(+), 1 deletion(-) create mode 100644 swarms/structs/async_workflow.py diff --git a/README.md b/README.md index 96be5b671..f2b8eb20c 100644 --- a/README.md +++ b/README.md @@ -399,6 +399,28 @@ print(f"Generated data: {generated_data}") ``` +## OpenAI Assistant Integration + +Swarms provides native support for OpenAI's Assistants API through the + `OpenAIAssistant` class: + +```python + +from swarms import OpenAIAssistant + +#Create an assistant +assistant = OpenAIAssistant( + name="Research Assistant", + instructions="You help with research tasks.", + tools=[{"type": "code_interpreter"}] +) + +#Run tasks +response = assistant.run("Analyze this dataset") + +``` + +See the [OpenAI Assistant documentation](docs/swarms/agents/openai_assistant.md) for more details. ### Multi Modal Autonomous Agent Run the agent with multiple modalities useful for various real-world tasks in manufacturing, logistics, and health. @@ -1830,7 +1852,7 @@ Documentation is located here at: [docs.swarms.world](https://docs.swarms.world) ----- ## Folder Structure -The swarms package has been meticlously crafted for extreme use-ability and understanding, the swarms package is split up into various modules such as `swarms.agents` that holds pre-built agents, `swarms.structs` that holds a vast array of structures like `Agent` and multi agent structures. The 3 most important are `structs`, `models`, and `agents`. +The swarms package has been meticlously crafted for extreme use-ability and understanding, the swarms package is split up into various modules such as `swarms.agents` that holds pre-built agents, `swarms.structs` that holds a vast array of structures like `Agent` and multi agent structures. The 3 most important are `structs`, `models`, and `agents`. ```sh ├── __init__.py diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index a9b77dae7..40f13346c 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -3,6 +3,7 @@ from swarms.structs.base_structure import BaseStructure from swarms.structs.base_swarm import BaseSwarm from swarms.structs.base_workflow import BaseWorkflow +from swarms.structs.async_workflow import AsyncWorkflow from swarms.structs.concurrent_workflow import ConcurrentWorkflow from swarms.structs.conversation import Conversation from swarms.structs.graph_workflow import ( @@ -78,6 +79,7 @@ __all__ = [ "Agent", + "AsyncWorkflow", "AutoSwarm", "AutoSwarmRouter", "BaseStructure", diff --git a/swarms/structs/async_workflow.py b/swarms/structs/async_workflow.py new file mode 100644 index 000000000..cd3aef57b --- /dev/null +++ b/swarms/structs/async_workflow.py @@ -0,0 +1,62 @@ +import asyncio +from typing import Any, Callable, List, Optional +from swarms.structs.base_workflow import BaseWorkflow +from swarms.structs.agent import Agent +from swarms.utils.loguru_logger import logger + +class AsyncWorkflow(BaseWorkflow): + def __init__( + self, + name: str = "AsyncWorkflow", + agents: List[Agent] = None, + max_workers: int = 5, + dashboard: bool = False, + autosave: bool = False, + verbose: bool = False, + **kwargs + ): + super().__init__(agents=agents, **kwargs) + self.name = name + self.agents = agents or [] + self.max_workers = max_workers + self.dashboard = dashboard + self.autosave = autosave + self.verbose = verbose + self.task_pool = [] + self.results = [] + self.loop = None + + async def _execute_agent_task(self, agent: Agent, task: str) -> Any: + """Execute a single agent task asynchronously""" + try: + if self.verbose: + logger.info(f"Agent {agent.agent_name} processing task: {task}") + result = await agent.arun(task) + if self.verbose: + logger.info(f"Agent {agent.agent_name} completed task") + return result + except Exception as e: + logger.error(f"Error in agent {agent.agent_name}: {str(e)}") + return str(e) + + async def run(self, task: str) -> List[Any]: + """Run the workflow with all agents processing the task concurrently""" + if not self.agents: + raise ValueError("No agents provided to the workflow") + + try: + # Create tasks for all agents + tasks = [self._execute_agent_task(agent, task) for agent in self.agents] + + # Execute all tasks concurrently + self.results = await asyncio.gather(*tasks, return_exceptions=True) + + if self.autosave: + # Implement autosave logic here + pass + + return self.results + + except Exception as e: + logger.error(f"Error in workflow execution: {str(e)}") + raise \ No newline at end of file From 121b7e015d3a17f1f1235b10650ec75ff86c5565 Mon Sep 17 00:00:00 2001 From: Occupying-Mars Date: Fri, 15 Nov 2024 18:54:48 +0530 Subject: [PATCH 3/3] swarmrouter output file types --- swarms/structs/swarm_router.py | 58 ++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/swarms/structs/swarm_router.py b/swarms/structs/swarm_router.py index 7d1179d91..c26cddba8 100644 --- a/swarms/structs/swarm_router.py +++ b/swarms/structs/swarm_router.py @@ -80,6 +80,9 @@ def __init__( flow: str = None, return_json: bool = True, auto_generate_prompts: bool = False, + output_files: Dict[str, str] = None, # New parameter + output_formats: List[str] = None, # New parameter + output_dir: str = "swarm_outputs", # New parameter *args, **kwargs, ): @@ -94,6 +97,28 @@ def __init__( self.auto_generate_prompts = auto_generate_prompts self.logs = [] + # Initialize output settings + self.output_dir = Path(output_dir) + self.output_dir.mkdir(exist_ok=True) + + # Default output formats if none specified + self.output_formats = output_formats or ["json"] + + # Custom output file paths or use defaults + self.output_files = output_files or { + fmt: self.output_dir / f"{self.name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{fmt}" + for fmt in self.output_formats + } + + # Format handlers + self._format_handlers = { + "json": self._to_json, + "yaml": self._to_yaml, + "md": self._to_markdown, + "csv": self._to_csv, + "txt": self._to_text + } + self.reliability_check() self._log( @@ -287,6 +312,7 @@ def run(self, task: str, *args, **kwargs) -> Any: task=task, metadata={"result": str(result)}, ) + self._save_outputs(result) return result except Exception as e: self._log( @@ -297,6 +323,38 @@ def run(self, task: str, *args, **kwargs) -> Any: ) raise + def _to_json(self, data: Any) -> str: + return json.dumps(data, indent=2) + + def _to_yaml(self, data: Any) -> str: + return yaml.dump(data) + + def _to_markdown(self, data: Any) -> str: + md = f"# {self.name} Results\n\n" + for agent_name, result in data.items(): + md += f"## {agent_name}\n\n{result}\n\n" + return md + + def _to_csv(self, data: Any) -> str: + if isinstance(data, dict): + df = pd.DataFrame.from_dict(data, orient='index') + return df.to_csv() + return "" + + def _to_text(self, data: Any) -> str: + return str(data) + + def _save_outputs(self, results: Any): + """Save results in all specified formats""" + for fmt, filepath in self.output_files.items(): + if fmt in self._format_handlers: + try: + formatted_data = self._format_handlers[fmt](results) + with open(filepath, 'w') as f: + f.write(formatted_data) + except Exception as e: + logger.error(f"Failed to save {fmt} output: {str(e)}") + def batch_run( self, tasks: List[str], *args, **kwargs ) -> List[Any]: