From 14fc0a9f8ad005a7bc1a6857422d1bf56fd864e9 Mon Sep 17 00:00:00 2001 From: Your Name Date: Sun, 17 Nov 2024 08:51:43 -0800 Subject: [PATCH] [LOGGING FIXES] --- swarms/agents/tool_agent.py | 4 +- swarms/artifacts/main_artifact.py | 4 +- swarms/structs/__init__.py | 37 ++- swarms/structs/base_swarm.py | 4 +- swarms/structs/base_workflow.py | 4 +- swarms/structs/company.py | 5 +- swarms/structs/federated_swarm.py | 393 ----------------------- swarms/structs/graph_workflow.py | 4 +- swarms/structs/multi_process_workflow.py | 105 +++++- swarms/structs/rearrange.py | 4 +- swarms/structs/round_robin.py | 4 +- swarms/structs/sequential_workflow.py | 4 +- swarms/structs/swarm_arange.py | 4 +- swarms/structs/swarm_load_balancer.py | 4 +- swarms/structs/swarm_net.py | 4 +- swarms/structs/swarm_registry.py | 4 +- swarms/structs/task.py | 4 +- swarms/tools/base_tool.py | 4 +- swarms/tools/func_calling_executor.py | 3 +- swarms/tools/pydantic_to_json.py | 4 +- swarms/tools/tool_parse_exec.py | 4 +- swarms/utils/async_file_creation.py | 46 +++ swarms/utils/calculate_func_metrics.py | 102 +++++- swarms/utils/file_processing.py | 127 +++++--- swarms/utils/parse_code.py | 20 +- swarms/utils/profile_func_2.py | 98 ------ 26 files changed, 426 insertions(+), 574 deletions(-) delete mode 100644 swarms/structs/federated_swarm.py diff --git a/swarms/agents/tool_agent.py b/swarms/agents/tool_agent.py index d05417f1a..34a316b35 100644 --- a/swarms/agents/tool_agent.py +++ b/swarms/agents/tool_agent.py @@ -2,7 +2,9 @@ from swarms.structs.agent import Agent from swarms.tools.json_former import Jsonformer -from swarms.utils.loguru_logger import logger +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger(log_folder="tool_agent") class ToolAgent(Agent): diff --git a/swarms/artifacts/main_artifact.py b/swarms/artifacts/main_artifact.py index d2009476b..5eaa939e3 100644 --- a/swarms/artifacts/main_artifact.py +++ b/swarms/artifacts/main_artifact.py @@ -1,11 +1,13 @@ import time -from swarms.utils.loguru_logger import logger import os import json from typing import List, Union, Dict, Any from pydantic import BaseModel, Field, validator from datetime import datetime from swarms.utils.file_processing import create_file_in_folder +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger(log_folder="main_artifact") class FileVersion(BaseModel): diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index e391a1d10..a660ed1ae 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -1,4 +1,5 @@ from swarms.structs.agent import Agent +from swarms.structs.agents_available import showcase_available_agents from swarms.structs.auto_swarm import AutoSwarm, AutoSwarmRouter from swarms.structs.base_structure import BaseStructure from swarms.structs.base_swarm import BaseSwarm @@ -19,15 +20,31 @@ parse_code_completion, ) from swarms.structs.message import Message - from swarms.structs.mixture_of_agents import MixtureOfAgents from swarms.structs.multi_agent_collab import MultiAgentCollaboration +from swarms.structs.multi_agent_exec import ( + run_agent_with_timeout, + run_agents_concurrently, + run_agents_concurrently_async, + run_agents_concurrently_multiprocess, + run_agents_sequentially, + run_agents_with_different_tasks, + run_agents_with_resource_monitoring, + run_agents_with_tasks_concurrently, + run_single_agent, +) from swarms.structs.queue_swarm import TaskQueueSwarm from swarms.structs.rearrange import AgentRearrange, rearrange from swarms.structs.round_robin import RoundRobinSwarm from swarms.structs.sequential_workflow import SequentialWorkflow from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm +from swarms.structs.swarm_arange import SwarmRearrange from swarms.structs.swarm_net import SwarmNetwork +from swarms.structs.swarm_router import ( + SwarmRouter, + SwarmType, + swarm_router, +) from swarms.structs.swarming_architectures import ( broadcast, circular_swarm, @@ -58,24 +75,6 @@ find_token_in_text, parse_tasks, ) -from swarms.structs.swarm_router import ( - SwarmRouter, - SwarmType, - swarm_router, -) -from swarms.structs.swarm_arange import SwarmRearrange -from swarms.structs.multi_agent_exec import ( - run_agents_concurrently, - run_agents_concurrently_async, - run_single_agent, - run_agents_concurrently_multiprocess, - run_agents_sequentially, - run_agents_with_different_tasks, - run_agent_with_timeout, - run_agents_with_resource_monitoring, - run_agents_with_tasks_concurrently, -) -from swarms.structs.agents_available import showcase_available_agents __all__ = [ "Agent", diff --git a/swarms/structs/base_swarm.py b/swarms/structs/base_swarm.py index 2f141213f..6e2242bed 100644 --- a/swarms/structs/base_swarm.py +++ b/swarms/structs/base_swarm.py @@ -20,13 +20,15 @@ from swarms.structs.agent import Agent from swarms.structs.conversation import Conversation from swarms.structs.omni_agent_types import AgentType -from swarms.utils.loguru_logger import logger from pydantic import BaseModel from swarms.utils.pandas_utils import ( dict_to_dataframe, display_agents_info, pydantic_model_to_dataframe, ) +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger(log_folder="base_swarm") class BaseSwarm(ABC): diff --git a/swarms/structs/base_workflow.py b/swarms/structs/base_workflow.py index b5deb9160..b75bfe2c7 100644 --- a/swarms/structs/base_workflow.py +++ b/swarms/structs/base_workflow.py @@ -6,7 +6,9 @@ from swarms.structs.agent import Agent from swarms.structs.base_structure import BaseStructure from swarms.structs.task import Task -from swarms.utils.loguru_logger import logger +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger("base-workflow") class BaseWorkflow(BaseStructure): diff --git a/swarms/structs/company.py b/swarms/structs/company.py index ef576e80f..f7fb36b72 100644 --- a/swarms/structs/company.py +++ b/swarms/structs/company.py @@ -2,8 +2,11 @@ from typing import Dict, List, Optional, Union from swarms.structs.agent import Agent -from swarms.utils.loguru_logger import logger from swarms.structs.base_swarm import BaseSwarm +from swarms.utils.loguru_logger import initialize_logger + + +logger = initialize_logger("company-swarm") @dataclass diff --git a/swarms/structs/federated_swarm.py b/swarms/structs/federated_swarm.py deleted file mode 100644 index 6c5e09cad..000000000 --- a/swarms/structs/federated_swarm.py +++ /dev/null @@ -1,393 +0,0 @@ -from typing import List, Callable, Union, Optional -from loguru import logger -from swarms.structs.base_swarm import BaseSwarm -from queue import PriorityQueue -from concurrent.futures import ( - ThreadPoolExecutor, - as_completed, -) -import time -from pydantic import BaseModel, Field - - -class SwarmRunData(BaseModel): - """ - Pydantic model to capture metadata about each swarm's execution. - """ - - swarm_name: str - task: str - priority: int - start_time: Optional[float] = None - end_time: Optional[float] = None - duration: Optional[float] = None - status: str = "Pending" - retries: int = 0 - result: Optional[str] = None - exception: Optional[str] = None - - -class FederatedSwarmModel(BaseModel): - """ - Pydantic base model to capture and log data for the FederatedSwarm system. - """ - - task: str - swarms_data: List[SwarmRunData] = Field(default_factory=list) - - def add_swarm(self, swarm_name: str, task: str, priority: int): - swarm_data = SwarmRunData( - swarm_name=swarm_name, task=task, priority=priority - ) - self.swarms_data.append(swarm_data) - - def update_swarm_status( - self, - swarm_name: str, - status: str, - start_time: float = None, - end_time: float = None, - retries: int = 0, - result: str = None, - exception: str = None, - ): - for swarm in self.swarms_data: - if swarm.name == swarm_name: - swarm.status = status - if start_time: - swarm.start_time = start_time - if end_time: - swarm.end_time = end_time - swarm.duration = end_time - swarm.start_time - swarm.retries = retries - swarm.result = result - swarm.exception = exception - break - - -class FederatedSwarm: - def __init__( - self, - swarms: List[Union[BaseSwarm, Callable]], - max_workers: int = 4, - ): - """ - Initializes the FederatedSwarm with a list of swarms or callable objects and - sets up a priority queue and thread pool for concurrency. - - Args: - swarms (List[Union[BaseSwarm, Callable]]): A list of swarms (BaseSwarm) or callable objects. - max_workers (int): The maximum number of concurrent workers (threads) to run swarms in parallel. - """ - self.swarms = PriorityQueue() - self.max_workers = max_workers - self.thread_pool = ThreadPoolExecutor( - max_workers=self.max_workers - ) - self.task_queue = [] - self.future_to_swarm = {} - self.results = {} - self.validate_swarms(swarms) - - def init_metadata(self, task: str): - """ - Initializes the Pydantic base model to capture metadata about the current task and swarms. - """ - self.metadata = FederatedSwarmModel(task=task) - for priority, swarm in list(self.swarms.queue): - swarm_name = ( - swarm.__class__.__name__ - if hasattr(swarm, "__class__") - else str(swarm) - ) - self.metadata.add_swarm( - swarm_name=swarm_name, task=task, priority=priority - ) - logger.info(f"Metadata initialized for task '{task}'.") - - def validate_swarms( - self, swarms: List[Union[BaseSwarm, Callable]] - ): - """ - Validates and adds swarms to the priority queue, ensuring each swarm has a `run(task)` method. - - Args: - swarms (List[Union[BaseSwarm, Callable]]): List of swarms with an optional priority value. - """ - for swarm, priority in swarms: - if not callable(swarm): - raise TypeError(f"{swarm} is not callable.") - - if hasattr(swarm, "run"): - logger.info(f"{swarm} has a 'run' method.") - else: - raise AttributeError( - f"{swarm} does not have a 'run(task)' method." - ) - - self.swarms.put((priority, swarm)) - logger.info( - f"Swarm {swarm} added with priority {priority}." - ) - - def run_parallel( - self, - task: str, - timeout: Optional[float] = None, - retries: int = 0, - ): - """ - Runs all swarms in parallel with prioritization and optional timeout. - - Args: - task (str): The task to be passed to the `run` method of each swarm. - timeout (Optional[float]): Maximum time allowed for each swarm to run. - retries (int): Number of retries allowed for failed swarms. - """ - logger.info( - f"Running task '{task}' in parallel with timeout: {timeout}, retries: {retries}" - ) - self.init_metadata(task) - - while not self.swarms.empty(): - priority, swarm = self.swarms.get() - swarm_name = ( - swarm.__class__.__name__ - if hasattr(swarm, "__class__") - else str(swarm) - ) - future = self.thread_pool.submit( - self._run_with_retry, - swarm, - task, - retries, - timeout, - swarm_name, - ) - self.future_to_swarm[future] = swarm - - for future in as_completed(self.future_to_swarm): - swarm = self.future_to_swarm[future] - try: - result = future.result() - swarm_name = ( - swarm.__class__.__name__ - if hasattr(swarm, "__class__") - else str(swarm) - ) - self.metadata.update_swarm_status( - swarm_name=swarm_name, - status="Completed", - result=result, - ) - logger.info( - f"Swarm {swarm_name} completed successfully." - ) - except Exception as e: - swarm_name = ( - swarm.__class__.__name__ - if hasattr(swarm, "__class__") - else str(swarm) - ) - self.metadata.update_swarm_status( - swarm_name=swarm_name, - status="Failed", - exception=str(e), - ) - logger.error(f"Swarm {swarm_name} failed: {e}") - self.results[swarm] = "Failed" - - def run_sequentially( - self, - task: str, - retries: int = 0, - timeout: Optional[float] = None, - ): - """ - Runs all swarms sequentially in order of priority. - - Args: - task (str): The task to pass to the `run` method of each swarm. - retries (int): Number of retries for failed swarms. - timeout (Optional[float]): Optional time limit for each swarm. - """ - logger.info(f"Running task '{task}' sequentially.") - - while not self.swarms.empty(): - priority, swarm = self.swarms.get() - try: - logger.info( - f"Running swarm {swarm} with priority {priority}." - ) - self._run_with_retry(swarm, task, retries, timeout) - logger.info(f"Swarm {swarm} completed successfully.") - except Exception as e: - logger.error(f"Swarm {swarm} failed with error: {e}") - - def _run_with_retry( - self, - swarm: Union[BaseSwarm, Callable], - task: str, - retries: int, - timeout: Optional[float], - swarm_name: str, - ): - """ - Helper function to run a swarm with a retry mechanism and optional timeout. - - Args: - swarm (Union[BaseSwarm, Callable]): The swarm to run. - task (str): The task to pass to the swarm. - retries (int): The number of retries allowed for the swarm in case of failure. - timeout (Optional[float]): Maximum time allowed for the swarm to run. - swarm_name (str): Name of the swarm (used for metadata). - """ - attempts = 0 - start_time = time.time() - while attempts <= retries: - try: - logger.info( - f"Running swarm {swarm}. Attempt: {attempts + 1}" - ) - self.metadata.update_swarm_status( - swarm_name=swarm_name, - status="Running", - start_time=start_time, - ) - if hasattr(swarm, "run"): - if timeout: - start_time = time.time() - swarm.run(task) - duration = time.time() - start_time - if duration > timeout: - raise TimeoutError( - f"Swarm {swarm} timed out after {duration:.2f}s." - ) - else: - swarm.run(task) - else: - swarm(task) - end_time = time.time() - self.metadata.update_swarm_status( - swarm_name=swarm_name, - status="Completed", - end_time=end_time, - retries=attempts, - ) - return "Success" - except Exception as e: - logger.error(f"Swarm {swarm} failed: {e}") - attempts += 1 - if attempts > retries: - end_time = time.time() - self.metadata.update_swarm_status( - swarm_name=swarm_name, - status="Failed", - end_time=end_time, - retries=attempts, - exception=str(e), - ) - logger.error(f"Swarm {swarm} exhausted retries.") - raise - - def add_swarm( - self, swarm: Union[BaseSwarm, Callable], priority: int - ): - """ - Adds a new swarm to the FederatedSwarm at runtime. - - Args: - swarm (Union[BaseSwarm, Callable]): The swarm to add. - priority (int): The priority level for the swarm. - """ - self.swarms.put((priority, swarm)) - logger.info( - f"Swarm {swarm} added dynamically with priority {priority}." - ) - - def queue_task(self, task: str): - """ - Adds a task to the internal task queue for batch processing. - - Args: - task (str): The task to queue. - """ - self.task_queue.append(task) - logger.info(f"Task '{task}' added to the queue.") - - def process_task_queue(self): - """ - Processes all tasks in the task queue. - """ - for task in self.task_queue: - logger.info(f"Processing task: {task}") - self.run_parallel(task) - self.task_queue = [] - - def log_swarm_results(self): - """ - Logs the results of all swarms after execution. - """ - logger.info("Logging swarm results...") - for swarm, result in self.results.items(): - logger.info(f"Swarm {swarm}: {result}") - - def get_swarm_status(self) -> dict: - """ - Retrieves the status of each swarm (completed, running, failed). - - Returns: - dict: Dictionary containing swarm statuses. - """ - status = {} - for future, swarm in self.future_to_swarm.items(): - if future.done(): - status[swarm] = "Completed" - elif future.running(): - status[swarm] = "Running" - else: - status[swarm] = "Failed" - return status - - def cancel_running_swarms(self): - """ - Cancels all currently running swarms by shutting down the thread pool. - """ - logger.warning("Cancelling all running swarms...") - self.thread_pool.shutdown(wait=False) - logger.info("All running swarms cancelled.") - - -# Example Usage: - - -# class ExampleSwarm(BaseSwarm): -# def run(self, task: str): -# logger.info(f"ExampleSwarm is processing task: {task}") - - -# def example_callable(task: str): -# logger.info(f"Callable is processing task: {task}") - - -# if __name__ == "__main__": -# swarms = [(ExampleSwarm(), 1), (example_callable, 2)] -# federated_swarm = FederatedSwarm(swarms) - -# # Run in parallel -# federated_swarm.run_parallel( -# "Process data", timeout=10, retries=3 -# ) - -# # Run sequentially -# federated_swarm.run_sequentially("Process data sequentially") - -# # Log results -# federated_swarm.log_swarm_results() - -# # Get status of swarms -# status = federated_swarm.get_swarm_status() -# logger.info(f"Swarm statuses: {status}") - -# # Cancel running swarms (if needed) -# # federated_swarm.cancel_running_swarms() diff --git a/swarms/structs/graph_workflow.py b/swarms/structs/graph_workflow.py index 989175b73..803a96435 100644 --- a/swarms/structs/graph_workflow.py +++ b/swarms/structs/graph_workflow.py @@ -5,7 +5,9 @@ from pydantic.v1 import BaseModel, Field, validator from swarms.structs.agent import Agent # noqa: F401 -from swarms.utils.loguru_logger import logger +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger(log_folder="graph_workflow") class NodeType(str, Enum): diff --git a/swarms/structs/multi_process_workflow.py b/swarms/structs/multi_process_workflow.py index 44051d0a6..7b04c10e5 100644 --- a/swarms/structs/multi_process_workflow.py +++ b/swarms/structs/multi_process_workflow.py @@ -1,9 +1,12 @@ from multiprocessing import Manager, Pool, cpu_count -from typing import Sequence, Union, Callable +from typing import Sequence, Union, Callable, List +from concurrent.futures import ThreadPoolExecutor, as_completed from swarms.structs.agent import Agent from swarms.structs.base_workflow import BaseWorkflow -from swarms.utils.loguru_logger import logger +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger(log_folder="multi_process_workflow") class MultiProcessWorkflow(BaseWorkflow): @@ -13,7 +16,7 @@ class MultiProcessWorkflow(BaseWorkflow): Args: max_workers (int): The maximum number of workers to use for parallel processing. autosave (bool): Flag indicating whether to automatically save the workflow. - tasks (List[Task]): A list of Task objects representing the workflow tasks. + agents (List[Union[Agent, Callable]]): A list of Agent objects or callable functions representing the workflow tasks. *args: Additional positional arguments. **kwargs: Additional keyword arguments. @@ -132,7 +135,7 @@ def run(self, task: str, *args, **kwargs): callback=results_list.append, timeout=task.timeout, ) - for agent in self.agent + for agent in self.agents ] # Wait for all jobs to complete @@ -145,3 +148,97 @@ def run(self, task: str, *args, **kwargs): except Exception as error: logger.error(f"Error in run: {error}") return None + + async def async_run(self, task: str, *args, **kwargs): + """Asynchronously run the workflow. + + Args: + task (Task): The task to run. + *args: Additional positional arguments for the task execution. + **kwargs: Additional keyword arguments for the task execution. + + Returns: + List[Any]: The results of all executed tasks. + + """ + try: + results = [] + with ThreadPoolExecutor( + max_workers=self.max_workers + ) as executor: + futures = [ + executor.submit( + self.execute_task, task, *args, **kwargs + ) + for _ in range(len(self.agents)) + ] + for future in as_completed(futures): + result = future.result() + results.append(result) + + return results + except Exception as error: + logger.error(f"Error in async_run: {error}") + return None + + def batched_run( + self, tasks: List[str], batch_size: int = 5, *args, **kwargs + ): + """Run tasks in batches. + + Args: + tasks (List[str]): A list of tasks to run. + batch_size (int): The size of each batch. + *args: Additional positional arguments for the task execution. + **kwargs: Additional keyword arguments for the task execution. + + Returns: + List[Any]: The results of all executed tasks. + + """ + try: + results = [] + for i in range(0, len(tasks), batch_size): + batch = tasks[i : i + batch_size] + with Pool(processes=self.max_workers) as pool: + results_list = pool.map( + self.execute_task, batch, *args, **kwargs + ) + results.extend(results_list) + + return results + except Exception as error: + logger.error(f"Error in batched_run: {error}") + return None + + def concurrent_run(self, tasks: List[str], *args, **kwargs): + """Run tasks concurrently. + + Args: + tasks (List[str]): A list of tasks to run. + *args: Additional positional arguments for the task execution. + **kwargs: Additional keyword arguments for the task execution. + + Returns: + List[Any]: The results of all executed tasks. + + """ + try: + results = [] + with ThreadPoolExecutor( + max_workers=self.max_workers + ) as executor: + futures = [ + executor.submit( + self.execute_task, task, *args, **kwargs + ) + for task in tasks + ] + for future in as_completed(futures): + result = future.result() + results.append(result) + + return results + except Exception as error: + logger.error(f"Error in concurrent_run: {error}") + return None diff --git a/swarms/structs/rearrange.py b/swarms/structs/rearrange.py index 231cab163..f3d8fa8c5 100644 --- a/swarms/structs/rearrange.py +++ b/swarms/structs/rearrange.py @@ -13,10 +13,12 @@ from swarms.structs.agents_available import showcase_available_agents from swarms.structs.base_swarm import BaseSwarm from swarms.utils.add_docs_to_agents import handle_input_docs -from swarms.utils.loguru_logger import logger from swarms.utils.wrapper_clusterop import ( exec_callable_with_clusterops, ) +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger(log_folder="rearrange") # Literal of output types OutputType = Literal[ diff --git a/swarms/structs/round_robin.py b/swarms/structs/round_robin.py index a2a2bd5dd..19198d3db 100644 --- a/swarms/structs/round_robin.py +++ b/swarms/structs/round_robin.py @@ -2,12 +2,14 @@ from swarms.structs.base_swarm import BaseSwarm from typing import List from swarms.structs.agent import Agent -from swarms.utils.loguru_logger import logger from pydantic import BaseModel, Field from typing import Optional from datetime import datetime from swarms.schemas.agent_step_schemas import ManySteps import tenacity +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger("round-robin") datetime_stamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") diff --git a/swarms/structs/sequential_workflow.py b/swarms/structs/sequential_workflow.py index 1fc0fe8ab..b3f289365 100644 --- a/swarms/structs/sequential_workflow.py +++ b/swarms/structs/sequential_workflow.py @@ -1,9 +1,11 @@ from typing import List from swarms.structs.agent import Agent -from swarms.utils.loguru_logger import logger from swarms.structs.rearrange import AgentRearrange, OutputType from concurrent.futures import ThreadPoolExecutor, as_completed from swarms.structs.agents_available import showcase_available_agents +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger(log_folder="sequential_workflow") class SequentialWorkflow: diff --git a/swarms/structs/swarm_arange.py b/swarms/structs/swarm_arange.py index 4e57facd6..efb880adf 100644 --- a/swarms/structs/swarm_arange.py +++ b/swarms/structs/swarm_arange.py @@ -3,8 +3,10 @@ import uuid from typing import Any, Callable, Dict, List, Optional -from swarms.utils.loguru_logger import logger from swarms.utils.any_to_str import any_to_str +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger(log_folder="swarm_arange") def swarm_id(): diff --git a/swarms/structs/swarm_load_balancer.py b/swarms/structs/swarm_load_balancer.py index b7cfdb948..275da2c25 100644 --- a/swarms/structs/swarm_load_balancer.py +++ b/swarms/structs/swarm_load_balancer.py @@ -5,7 +5,9 @@ from swarms.structs.agent import Agent from swarms.structs.base_swarm import BaseSwarm -from swarms.utils.loguru_logger import logger +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger(log_folder="swarm_load_balancer") class AgentLoadBalancer(BaseSwarm): diff --git a/swarms/structs/swarm_net.py b/swarms/structs/swarm_net.py index 33be00de9..dac0d0a2c 100644 --- a/swarms/structs/swarm_net.py +++ b/swarms/structs/swarm_net.py @@ -19,7 +19,9 @@ from swarms.structs.agent import Agent from swarms.structs.base_swarm import BaseSwarm -from swarms.utils.loguru_logger import logger +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger("swarm-network") # Pydantic models diff --git a/swarms/structs/swarm_registry.py b/swarms/structs/swarm_registry.py index b35aafb11..a4db3cb42 100644 --- a/swarms/structs/swarm_registry.py +++ b/swarms/structs/swarm_registry.py @@ -1,6 +1,8 @@ from pydantic.v1 import BaseModel from typing import List, Callable -from swarms.utils.loguru_logger import logger +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger(log_folder="swarm_registry") class SwarmRegistry(BaseModel): diff --git a/swarms/structs/task.py b/swarms/structs/task.py index 70293426b..fc73dea96 100644 --- a/swarms/structs/task.py +++ b/swarms/structs/task.py @@ -9,8 +9,10 @@ from swarms.structs.agent import Agent from swarms.structs.conversation import Conversation from swarms.structs.omni_agent_types import AgentType -from swarms.utils.loguru_logger import logger from typing import Optional +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger(log_folder="task") class Task(BaseModel): diff --git a/swarms/tools/base_tool.py b/swarms/tools/base_tool.py index 519ddc8cd..dcb819741 100644 --- a/swarms/tools/base_tool.py +++ b/swarms/tools/base_tool.py @@ -14,7 +14,9 @@ base_model_to_openai_function, multi_base_model_to_openai_function, ) -from swarms.utils.loguru_logger import logger +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger(log_folder="base_tool") ToolType = Union[BaseModel, Dict[str, Any], Callable[..., Any]] diff --git a/swarms/tools/func_calling_executor.py b/swarms/tools/func_calling_executor.py index 5cc0e4b52..65d95a738 100644 --- a/swarms/tools/func_calling_executor.py +++ b/swarms/tools/func_calling_executor.py @@ -1,7 +1,8 @@ import concurrent.futures from typing import Callable, Any, Dict, List -from swarms.utils.loguru_logger import logger +from swarms.utils.loguru_logger import initialize_logger +logger = initialize_logger(log_folder="func_calling_executor") # def openai_tool_executor( # tools: List[Dict[str, Any]], diff --git a/swarms/tools/pydantic_to_json.py b/swarms/tools/pydantic_to_json.py index 7c64ea8e3..1f6521df4 100644 --- a/swarms/tools/pydantic_to_json.py +++ b/swarms/tools/pydantic_to_json.py @@ -2,7 +2,9 @@ from docstring_parser import parse from pydantic import BaseModel -from swarms.utils.loguru_logger import logger +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger("pydantic_to_json") def _remove_a_key(d: dict, remove_key: str) -> None: diff --git a/swarms/tools/tool_parse_exec.py b/swarms/tools/tool_parse_exec.py index 8686781a8..7cc4369f0 100644 --- a/swarms/tools/tool_parse_exec.py +++ b/swarms/tools/tool_parse_exec.py @@ -1,8 +1,10 @@ import json from typing import List, Any, Callable -from swarms.utils.loguru_logger import logger from swarms.utils.parse_code import extract_code_from_markdown +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger(log_folder="tool_parse_exec") def parse_and_execute_json( diff --git a/swarms/utils/async_file_creation.py b/swarms/utils/async_file_creation.py index 90832db3f..6c35e95dd 100644 --- a/swarms/utils/async_file_creation.py +++ b/swarms/utils/async_file_creation.py @@ -58,3 +58,49 @@ async def create_file_with_directory( os.makedirs(directory) await async_create_file(file_path, content) + + +def sync_create_file(file_path: str, content: str) -> None: + """ + Synchronously creates a file at the specified path and writes the given content to it. + + Args: + file_path (str): The path where the file will be created. + content (str): The content to be written to the file. + + Returns: + None + """ + asyncio.run(async_create_file(file_path, content)) + + +def sync_create_multiple_files( + file_paths: List[str], contents: List[str] +) -> None: + """ + Synchronously creates multiple files at the specified paths and writes the corresponding content to each file. + + Args: + file_paths (List[str]): A list of paths where the files will be created. + contents (List[str]): A list of content to be written to each file, corresponding to the file paths. + + Returns: + None + """ + asyncio.run(create_multiple_files(file_paths, contents)) + + +def sync_create_file_with_directory( + file_path: str, content: str +) -> None: + """ + Synchronously creates a file with the specified directory path and content. If the directory does not exist, it is created. + + Args: + file_path (str): The path of the file to be created, including the directory. + content (str): The content to be written to the file. + + Returns: + None + """ + asyncio.run(create_file_with_directory(file_path, content)) diff --git a/swarms/utils/calculate_func_metrics.py b/swarms/utils/calculate_func_metrics.py index 1aacb3a99..bfb8a5288 100644 --- a/swarms/utils/calculate_func_metrics.py +++ b/swarms/utils/calculate_func_metrics.py @@ -1,7 +1,15 @@ import time +import tracemalloc +from functools import wraps +from typing import Any, Callable + import psutil +from loguru import logger from pydantic import BaseModel -from swarms.utils.loguru_logger import logger + +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger(log_folder="calculate_func_metrics") class FunctionMetrics(BaseModel): @@ -70,3 +78,95 @@ def wrapper(*args, **kwargs): return result, metrics return wrapper + + +def profile_all(func: Callable) -> Callable: + """ + A decorator to profile memory usage, CPU usage, and I/O operations + of a function and log the data using loguru. + + It combines tracemalloc for memory profiling, psutil for CPU and I/O operations, + and measures execution time. + + Args: + func (Callable): The function to be profiled. + + Returns: + Callable: The wrapped function with profiling enabled. + """ + + @wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + # Start memory tracking + tracemalloc.start() + + # Get initial CPU stats + process = psutil.Process() + initial_cpu_times = process.cpu_times() + + # Get initial I/O stats if available + try: + initial_io_counters = process.io_counters() + io_tracking_available = True + except AttributeError: + logger.warning( + "I/O counters not available on this platform." + ) + io_tracking_available = False + + # Start timing the function execution + start_time = time.time() + + # Execute the function + result = func(*args, **kwargs) + + # Stop timing + end_time = time.time() + execution_time = end_time - start_time + + # Get final CPU stats + final_cpu_times = process.cpu_times() + + # Get final I/O stats if available + if io_tracking_available: + final_io_counters = process.io_counters() + io_read_count = ( + final_io_counters.read_count + - initial_io_counters.read_count + ) + io_write_count = ( + final_io_counters.write_count + - initial_io_counters.write_count + ) + else: + io_read_count = io_write_count = 0 + + # Get memory usage statistics + snapshot = tracemalloc.take_snapshot() + top_stats = snapshot.statistics("lineno") + + # Calculate CPU usage + cpu_usage = ( + final_cpu_times.user + - initial_cpu_times.user + + final_cpu_times.system + - initial_cpu_times.system + ) + + # Log the data + logger.info(f"Execution time: {execution_time:.4f} seconds") + logger.info(f"CPU usage: {cpu_usage:.2f} seconds") + if io_tracking_available: + logger.info( + f"I/O Operations - Read: {io_read_count}, Write: {io_write_count}" + ) + logger.info("Top memory usage:") + for stat in top_stats[:10]: + logger.info(stat) + + # Stop memory tracking + tracemalloc.stop() + + return result + + return wrapper diff --git a/swarms/utils/file_processing.py b/swarms/utils/file_processing.py index e14918fd0..30e5dbf62 100644 --- a/swarms/utils/file_processing.py +++ b/swarms/utils/file_processing.py @@ -5,6 +5,28 @@ import re import shutil import tempfile +from swarms.utils.loguru_logger import initialize_logger + +logger = initialize_logger(log_folder="file_processing") + + +def check_if_folder_exists(folder_name: str) -> bool: + """ + Check if a folder exists at the specified path. + + Args: + folder_name (str): The path to the folder to check. + + Returns: + bool: True if the folder exists, False otherwise. + """ + try: + return os.path.exists(folder_name) and os.path.isdir( + folder_name + ) + except Exception as e: + logger.error(f"Failed to check if folder exists: {e}") + return False def zip_workspace(workspace_path: str, output_filename: str): @@ -12,25 +34,33 @@ def zip_workspace(workspace_path: str, output_filename: str): Zips the specified workspace directory and returns the path to the zipped file. Ensure the output_filename does not have .zip extension as it's added by make_archive. """ - temp_dir = tempfile.mkdtemp() - # Remove .zip if present in output_filename to avoid duplication - base_output_path = os.path.join( - temp_dir, output_filename.replace(".zip", "") - ) - zip_path = shutil.make_archive( - base_output_path, "zip", workspace_path - ) - return zip_path # make_archive already appends .zip + try: + temp_dir = tempfile.mkdtemp() + # Remove .zip if present in output_filename to avoid duplication + base_output_path = os.path.join( + temp_dir, output_filename.replace(".zip", "") + ) + zip_path = shutil.make_archive( + base_output_path, "zip", workspace_path + ) + return zip_path # make_archive already appends .zip + except Exception as e: + logger.error(f"Failed to zip workspace: {e}") + return None def sanitize_file_path(file_path: str): """ Cleans and sanitizes the file path to be valid for Windows. """ - sanitized_path = file_path.replace("`", "").strip() - # Replace any invalid characters here with an underscore or remove them - sanitized_path = re.sub(r'[<>:"/\\|?*]', "_", sanitized_path) - return sanitized_path + try: + sanitized_path = file_path.replace("`", "").strip() + # Replace any invalid characters here with an underscore or remove them + sanitized_path = re.sub(r'[<>:"/\\|?*]', "_", sanitized_path) + return sanitized_path + except Exception as e: + logger.error(f"Failed to sanitize file path: {e}") + return None def load_json(json_string: str): @@ -43,11 +73,14 @@ def load_json(json_string: str): Returns: object: The Python object representing the JSON data. """ - json_data = json.loads(json_string) - return json_data + try: + json_data = json.loads(json_string) + return json_data + except json.JSONDecodeError as e: + logger.error(f"Failed to decode JSON: {e}") + return None -# Create file that def create_file( content: str, file_path: str, @@ -59,9 +92,13 @@ def create_file( content (str): The content to be written to the file. file_path (str): The path to the file to be created. """ - with open(file_path, "w") as file: - file.write(content) - return file_path + try: + with open(file_path, "w") as file: + file.write(content) + return file_path + except Exception as e: + logger.error(f"Failed to create file: {e}") + return None def create_file_in_folder( @@ -78,15 +115,19 @@ def create_file_in_folder( Returns: str: The path of the created file. """ - if not os.path.exists(folder_path): - os.makedirs(folder_path) + try: + if not os.path.exists(folder_path): + os.makedirs(folder_path) - # Create the file in the folder - file_path = os.path.join(folder_path, file_name) - with open(file_path, "w") as file: - file.write(content) + # Create the file in the folder + file_path = os.path.join(folder_path, file_name) + with open(file_path, "w") as file: + file.write(content) - return file_path + return file_path + except Exception as e: + logger.error(f"Failed to create file in folder: {e}") + return None def zip_folders( @@ -103,16 +144,24 @@ def zip_folders( Returns: None """ - # Create a temporary directory - with tempfile.TemporaryDirectory() as temp_dir: - # Copy both folders into the temporary directory - shutil.copytree( - folder1_path, - os.path.join(temp_dir, os.path.basename(folder1_path)), - ) - shutil.copytree( - folder2_path, - os.path.join(temp_dir, os.path.basename(folder2_path)), - ) - # Create a zip file that contains the temporary directory - shutil.make_archive(zip_file_path, "zip", temp_dir) + try: + # Create a temporary directory + with tempfile.TemporaryDirectory() as temp_dir: + # Copy both folders into the temporary directory + shutil.copytree( + folder1_path, + os.path.join( + temp_dir, os.path.basename(folder1_path) + ), + ) + shutil.copytree( + folder2_path, + os.path.join( + temp_dir, os.path.basename(folder2_path) + ), + ) + # Create a zip file that contains the temporary directory + shutil.make_archive(zip_file_path, "zip", temp_dir) + except Exception as e: + logger.error(f"Failed to zip folders: {e}") + return None diff --git a/swarms/utils/parse_code.py b/swarms/utils/parse_code.py index 25cd62107..f295340c2 100644 --- a/swarms/utils/parse_code.py +++ b/swarms/utils/parse_code.py @@ -14,14 +14,30 @@ def extract_code_from_markdown(markdown_content: str) -> str: # Regular expression for fenced code blocks with optional language specifier pattern = r"```(?:\w+\n)?(.*?)```" + # Check if markdown_content is a string + if not isinstance(markdown_content, str): + raise TypeError("markdown_content must be a string") + # Find all matches of the pattern matches = re.finditer(pattern, markdown_content, re.DOTALL) # Extract the content inside the backticks - code_blocks = [match.group(1).strip() for match in matches] + code_blocks = [] + for match in matches: + code_block = match.group(1).strip() + # Remove any leading or trailing whitespace from the code block + code_block = code_block.strip() + # Remove any empty lines from the code block + code_block = "\n".join( + [line for line in code_block.split("\n") if line.strip()] + ) + code_blocks.append(code_block) # Concatenate all code blocks separated by newlines - return "\n".join(code_blocks) + if code_blocks: + return "\n\n".join(code_blocks) + else: + return "" # example = """ diff --git a/swarms/utils/profile_func_2.py b/swarms/utils/profile_func_2.py index a17c85aa4..e69de29bb 100644 --- a/swarms/utils/profile_func_2.py +++ b/swarms/utils/profile_func_2.py @@ -1,98 +0,0 @@ -from functools import wraps -from loguru import logger -import tracemalloc -import psutil -import time -from typing import Callable, Any - - -def profile_all(func: Callable) -> Callable: - """ - A decorator to profile memory usage, CPU usage, and I/O operations - of a function and log the data using loguru. - - It combines tracemalloc for memory profiling, psutil for CPU and I/O operations, - and measures execution time. - - Args: - func (Callable): The function to be profiled. - - Returns: - Callable: The wrapped function with profiling enabled. - """ - - @wraps(func) - def wrapper(*args: Any, **kwargs: Any) -> Any: - # Start memory tracking - tracemalloc.start() - - # Get initial CPU stats - process = psutil.Process() - initial_cpu_times = process.cpu_times() - - # Get initial I/O stats if available - try: - initial_io_counters = process.io_counters() - io_tracking_available = True - except AttributeError: - logger.warning( - "I/O counters not available on this platform." - ) - io_tracking_available = False - - # Start timing the function execution - start_time = time.time() - - # Execute the function - result = func(*args, **kwargs) - - # Stop timing - end_time = time.time() - execution_time = end_time - start_time - - # Get final CPU stats - final_cpu_times = process.cpu_times() - - # Get final I/O stats if available - if io_tracking_available: - final_io_counters = process.io_counters() - io_read_count = ( - final_io_counters.read_count - - initial_io_counters.read_count - ) - io_write_count = ( - final_io_counters.write_count - - initial_io_counters.write_count - ) - else: - io_read_count = io_write_count = 0 - - # Get memory usage statistics - snapshot = tracemalloc.take_snapshot() - top_stats = snapshot.statistics("lineno") - - # Calculate CPU usage - cpu_usage = ( - final_cpu_times.user - - initial_cpu_times.user - + final_cpu_times.system - - initial_cpu_times.system - ) - - # Log the data - logger.info(f"Execution time: {execution_time:.4f} seconds") - logger.info(f"CPU usage: {cpu_usage:.2f} seconds") - if io_tracking_available: - logger.info( - f"I/O Operations - Read: {io_read_count}, Write: {io_write_count}" - ) - logger.info("Top memory usage:") - for stat in top_stats[:10]: - logger.info(stat) - - # Stop memory tracking - tracemalloc.stop() - - return result - - return wrapper