From ec445e27212d552ffed22386a76bde9928b7e9a1 Mon Sep 17 00:00:00 2001 From: GabrielBarberini Date: Thu, 11 Jul 2024 21:11:14 -0300 Subject: [PATCH] refactor repositories accesses --- lib/repositories/environment.py | 37 ++++------ lib/repositories/flight.py | 37 ++++------ lib/repositories/motor.py | 37 ++++------ lib/repositories/repo.py | 118 ++++++++++++++++++++------------ lib/repositories/rocket.py | 37 ++++------ pyproject.toml | 1 - 6 files changed, 130 insertions(+), 137 deletions(-) diff --git a/lib/repositories/environment.py b/lib/repositories/environment.py index b15512d..351b9c6 100644 --- a/lib/repositories/environment.py +++ b/lib/repositories/environment.py @@ -1,8 +1,8 @@ from typing import Union from pymongo.errors import PyMongoError -from lib import logger, parse_error from lib.models.environment import Env -from lib.repositories.repo import Repository +from lib import logger +from lib.repositories.repo import Repository, RepositoryNotInitializedException class EnvRepository(Repository): @@ -42,14 +42,17 @@ def env_id(self, env_id: "str"): self._env_id = env_id async def insert_env(self, env_data: dict): - await self.collection.insert_one(env_data) + collection = self.get_collection() + await collection.insert_one(env_data) return self async def find_env(self, env_id: str): - return await self.collection.find_one({"env_id": env_id}) + collection = self.get_collection() + return await collection.find_one({"env_id": env_id}) async def delete_env(self, env_id: str): - await self.collection.delete_one({"env_id": env_id}) + collection = self.get_collection() + await collection.delete_one({"env_id": env_id}) return self async def create_env(self): @@ -64,13 +67,9 @@ async def create_env(self): environment_to_dict["env_id"] = self.env_id await self.insert_env(environment_to_dict) except PyMongoError as e: - exc_str = parse_error(e) - logger.error(f"repositories.environment.create_env: {exc_str}") raise e from e - except Exception as e: - exc_str = parse_error(e) - logger.error(f"repositories.environment.create_env: {exc_str}") - raise Exception(f"Error creating environment: {exc_str}") from e + except RepositoryNotInitializedException as e: + raise e from e else: return self finally: @@ -90,13 +89,9 @@ async def get_env_by_id(self, env_id: str) -> Union[Env, None]: parsed_env = Env.parse_obj(read_env) if read_env else None self.env = parsed_env except PyMongoError as e: - exc_str = parse_error(e) - logger.error(f"repositories.environment.get_env: {exc_str}") raise e from e - except Exception as e: - exc_str = parse_error(e) - logger.error(f"repositories.environment.get_env: {exc_str}") - raise Exception(f"Error getting environment: {exc_str}") from e + except RepositoryNotInitializedException as e: + raise e from e else: return self finally: @@ -114,13 +109,9 @@ async def delete_env_by_id(self, env_id: str): try: await self.delete_env(env_id) except PyMongoError as e: - exc_str = parse_error(e) - logger.error(f"repositories.environment.delete_env: {exc_str}") raise e from e - except Exception as e: - exc_str = parse_error(e) - logger.error(f"repositories.environment.delete_env: {exc_str}") - raise Exception(f"Error deleting environment: {exc_str}") from e + except RepositoryNotInitializedException as e: + raise e from e else: return self finally: diff --git a/lib/repositories/flight.py b/lib/repositories/flight.py index 43f80c1..ac68549 100644 --- a/lib/repositories/flight.py +++ b/lib/repositories/flight.py @@ -1,8 +1,8 @@ from typing import Union from pymongo.errors import PyMongoError -from lib import logger, parse_error +from lib import logger from lib.models.flight import Flight -from lib.repositories.repo import Repository +from lib.repositories.repo import Repository, RepositoryNotInitializedException class FlightRepository(Repository): @@ -42,13 +42,16 @@ def flight_id(self, flight_id: "str"): self._flight_id = flight_id async def insert_flight(self, flight_data: dict): - await self.collection.insert_one(flight_data) + collection = self.get_collection() + await collection.insert_one(flight_data) async def find_flight(self, flight_id: str): - return await self.collection.find_one({"flight_id": flight_id}) + collection = self.get_collection() + return await collection.find_one({"flight_id": flight_id}) async def delete_flight(self, flight_id: str): - await self.collection.delete_one({"flight_id": flight_id}) + collection = self.get_collection() + await collection.delete_one({"flight_id": flight_id}) return self async def create_flight( @@ -71,13 +74,9 @@ async def create_flight( flight_to_dict["rocket"]["motor"]["motor_kind"] = motor_kind await self.insert_flight(flight_to_dict) except PyMongoError as e: - exc_str = parse_error(e) - logger.error(f"repositories.flight.create_flight: {exc_str}") raise e from e - except Exception as e: - exc_str = parse_error(e) - logger.error(f"repositories.flight.create_flight: {exc_str}") - raise Exception(f"Error creating flight: {exc_str}") from e + except RepositoryNotInitializedException as e: + raise e from e else: return self finally: @@ -99,13 +98,9 @@ async def get_flight_by_id(self, flight_id: str) -> Union[Flight, None]: ) self.flight = parsed_flight except PyMongoError as e: - exc_str = parse_error(e) - logger.error(f"repositories.flight.get_flight: {exc_str}") raise e from e - except Exception as e: - exc_str = parse_error(e) - logger.error(f"repositories.flight.get_flight: {exc_str}") - raise Exception(f"Error getting flight: {exc_str}") from e + except RepositoryNotInitializedException as e: + raise e from e else: return self finally: @@ -123,13 +118,9 @@ async def delete_flight_by_id(self, flight_id: str): try: await self.delete_flight(flight_id) except PyMongoError as e: - exc_str = parse_error(e) - logger.error(f"repositories.flight.delete_flight: {exc_str}") raise e from e - except Exception as e: - exc_str = parse_error(e) - logger.error(f"repositories.flight.delete_flight: {exc_str}") - raise Exception(f"Error deleting flight: {exc_str}") from e + except RepositoryNotInitializedException as e: + raise e from e else: return self finally: diff --git a/lib/repositories/motor.py b/lib/repositories/motor.py index 12ed72c..806016a 100644 --- a/lib/repositories/motor.py +++ b/lib/repositories/motor.py @@ -1,8 +1,8 @@ from typing import Union from pymongo.errors import PyMongoError -from lib import logger, parse_error +from lib import logger from lib.models.motor import Motor -from lib.repositories.repo import Repository +from lib.repositories.repo import Repository, RepositoryNotInitializedException class MotorRepository(Repository): @@ -42,14 +42,17 @@ def motor_id(self, motor_id: "str"): self._motor_id = motor_id async def insert_motor(self, motor_data: dict): - await self.collection.insert_one(motor_data) + collection = self.get_collection() + await collection.insert_one(motor_data) return self async def find_motor(self, motor_id: str): - return await self.collection.find_one({"motor_id": motor_id}) + collection = self.get_collection() + return await collection.find_one({"motor_id": motor_id}) async def delete_motor(self, motor_id: str): - await self.collection.delete_one({"motor_id": motor_id}) + collection = self.get_collection() + await collection.delete_one({"motor_id": motor_id}) return self async def create_motor(self, motor_kind: str = "SOLID"): @@ -68,13 +71,9 @@ async def create_motor(self, motor_kind: str = "SOLID"): motor_to_dict["motor_kind"] = motor_kind await self.insert_motor(motor_to_dict) except PyMongoError as e: - exc_str = parse_error(e) - logger.error(f"repositories.motor.create_motor: {exc_str}") raise e from e - except Exception as e: - exc_str = parse_error(e) - logger.error(f"repositories.motor.create_motor: {exc_str}") - raise Exception(f"Error creating motor: {exc_str}") from e + except RepositoryNotInitializedException as e: + raise e from e else: return self finally: @@ -94,13 +93,9 @@ async def get_motor_by_id(self, motor_id: str) -> Union[motor, None]: parsed_motor = Motor.parse_obj(read_motor) if read_motor else None self.motor = parsed_motor except PyMongoError as e: - exc_str = parse_error(e) - logger.error(f"repositories.motor.get_motor: {exc_str}") raise e from e - except Exception as e: - exc_str = parse_error(e) - logger.error(f"repositories.motor.get_motor: {exc_str}") - raise Exception(f"Error getting motor: {exc_str}") from e + except RepositoryNotInitializedException as e: + raise e from e else: return self finally: @@ -118,13 +113,9 @@ async def delete_motor_by_id(self, motor_id: str): try: await self.delete_motor(motor_id) except PyMongoError as e: - exc_str = parse_error(e) - logger.error(f"repositories.motor.delete_motor: {exc_str}") raise e from e - except Exception as e: - exc_str = parse_error(e) - logger.error(f"repositories.motor.delete_motor: {exc_str}") - raise Exception(f"Error deleting motor: {exc_str}") from e + except RepositoryNotInitializedException as e: + raise e from e else: return self finally: diff --git a/lib/repositories/repo.py b/lib/repositories/repo.py index 39d217e..316f98f 100644 --- a/lib/repositories/repo.py +++ b/lib/repositories/repo.py @@ -1,7 +1,8 @@ import asyncio import threading -from lib import logger -from lib.secrets import Secrets +from app.lib import logger +from app.lib.secrets import Secrets +from pydantic import BaseModel from fastapi import HTTPException, status from motor.motor_asyncio import AsyncIOMotorClient from pymongo.server_api import ServerApi @@ -16,30 +17,80 @@ def __init__(self): ) +class RepoInstances(BaseModel): + instance: object + prospecting: int = 0 + + def add_prospecting(self): + self.prospecting += 1 + + def remove_prospecting(self): + self.prospecting -= 1 + + def get_prospecting(self): + return self.prospecting + + def get_instance(self): + return self.instance + + class Repository: """ Base class for all repositories (singleton) """ - _instances = {} - _thread_lock = threading.RLock() + _global_instances = {} + _global_thread_lock = threading.Lock() _global_async_lock = asyncio.Lock() def __new__(cls, *args, **kwargs): with ( - cls._thread_lock + cls._global_thread_lock ): # Ensure thread safety for singleton instance creation - if cls not in cls._instances: + if cls not in cls._global_instances: instance = super(Repository, cls).__new__(cls) - cls._instances[cls] = instance - return cls._instances[cls] - - def __init__(self, collection_name: str): + cls._global_instances[cls] = RepoInstances(instance=instance) + else: + cls._global_instances[cls].add_prospecting() + return cls._global_instances[cls].get_instance() + + @classmethod + def _update_prospecting(cls): + if cls in cls._global_instances: + cls._global_instances[cls].remove_prospecting() + + @classmethod + def _get_instance_prospecting(cls): + if cls in cls._global_instances: + return cls._global_instances[cls].get_prospecting() + return 0 + + def __init__(self, collection_name: str, *, min_pool_size: int = 1): if not getattr(self, '_initialized', False): + self._min_pool_size = min_pool_size self._collection_name = collection_name self._initialized_event = asyncio.Event() self._initialize() + @retry(stop=stop_after_attempt(5), wait=wait_fixed(0.2)) + async def _async_init(self): + async with ( + self._global_async_lock + ): # Hybrid safe locks for initialization + with self._global_thread_lock: + try: + self._initialize_connection() + self._initialized = True + except Exception as e: + logger.error("Initialization failed: %s", e, exc_info=True) + self._initialized = False + + def _on_init_done(self, future): + try: + future.result() + finally: + self._initialized_event.set() + def _initialize(self): if not asyncio.get_event_loop().is_running(): asyncio.run(self._async_init()) @@ -49,24 +100,6 @@ def _initialize(self): self._on_init_done ) - def _on_init_done(self, future): - try: - future.result() - except Exception as e: - logger.error("Initialization failed: %s", e, exc_info=True) - self._initialized = False - finally: - self._initialized_event.set() - - @retry(stop=stop_after_attempt(3), wait=wait_fixed(0.5)) - async def _async_init(self): - async with ( - self._global_async_lock - ): # Hybrid safe locks for initialization - with self._thread_lock: - self._initialize_connection() - self._initialized = True - async def __aenter__(self): await self._initialized_event.wait() # Ensure initialization is complete return self @@ -83,9 +116,10 @@ def _initialize_connection(self): self._client = AsyncIOMotorClient( self._connection_string, server_api=ServerApi("1"), - maxIdleTimeMS=15000, - connectTimeoutMS=15000, - serverSelectionTimeoutMS=30000, + maxIdleTimeMS=10000, + minPoolSize=self._min_pool_size, + maxPoolSize=100, + serverSelectionTimeoutMS=60000, ) self._collection = self._client.luchada_db[self._collection_name] logger.info("MongoDB client initialized for %s", self.__class__) @@ -101,14 +135,16 @@ async def _cleanup_instance(self): async with ( self._global_async_lock ): # Hybrid safe locks for finalization - with self._thread_lock: - if hasattr(self, '_client'): - self._client.close() - logger.info("Connection closed for %s", self.__class__) + with self._global_thread_lock: + self._global_instances.pop(self.__class__, None) self._initialized = False - self._instances.pop(self.__class__, None) + self._update_prospecting() + if not self._get_instance_prospecting(): + if self._client: + self._client.close() + logger.info("Connection closed for %s", self.__class__) + self._client = None - @retry(stop=stop_after_attempt(3), wait=wait_fixed(0.5)) def _get_connection_string(self): if not getattr(self, '_initialized', False): raise RepositoryNotInitializedException() @@ -118,7 +154,6 @@ def _get_connection_string(self): def connection_string(self): return self._get_connection_string() - @retry(stop=stop_after_attempt(3), wait=wait_fixed(0.5)) def _get_client(self): if not getattr(self, '_initialized', False): raise RepositoryNotInitializedException() @@ -128,12 +163,7 @@ def _get_client(self): def client(self): return self._get_client() - @retry(stop=stop_after_attempt(3), wait=wait_fixed(0.5)) - def _get_collection(self): + def get_collection(self): if not getattr(self, '_initialized', False): raise RepositoryNotInitializedException() return self._collection - - @property - def collection(self): - return self._get_collection() diff --git a/lib/repositories/rocket.py b/lib/repositories/rocket.py index 605a24d..2c74cee 100644 --- a/lib/repositories/rocket.py +++ b/lib/repositories/rocket.py @@ -1,8 +1,8 @@ from typing import Union from pymongo.errors import PyMongoError -from lib import logger, parse_error +from lib import logger from lib.models.rocket import Rocket -from lib.repositories.repo import Repository +from lib.repositories.repo import Repository, RepositoryNotInitializedException class RocketRepository(Repository): @@ -42,14 +42,17 @@ def rocket_id(self, rocket_id: "str"): self._rocket_id = rocket_id async def insert_rocket(self, rocket_data: dict): - await self.collection.insert_one(rocket_data) + collection = self.get_collection() + await collection.insert_one(rocket_data) return self async def find_rocket(self, rocket_id: str): - return await self.collection.find_one({"rocket_id": rocket_id}) + collection = self.get_collection() + return await collection.find_one({"rocket_id": rocket_id}) async def delete_rocket(self, rocket_id: str): - await self.collection.delete_one({"rocket_id": rocket_id}) + collection = self.get_collection() + await collection.delete_one({"rocket_id": rocket_id}) return self async def create_rocket( @@ -72,13 +75,9 @@ async def create_rocket( rocket_to_dict["motor"]["motor_kind"] = motor_kind await self.insert_rocket(rocket_to_dict) except PyMongoError as e: - exc_str = parse_error(e) - logger.error(f"repositories.rocket.create_rocket: {exc_str}") raise e from e - except Exception as e: - exc_str = parse_error(e) - logger.error(f"repositories.rocket.create_rocket: {exc_str}") - raise Exception(f"Error creating rocket: {exc_str}") from e + except RepositoryNotInitializedException as e: + raise e from e else: return self finally: @@ -100,13 +99,9 @@ async def get_rocket_by_id(self, rocket_id: str) -> Union[Rocket, None]: ) self.rocket = parsed_rocket except PyMongoError as e: - exc_str = parse_error(e) - logger.error(f"repositories.rocket.get_rocket: {exc_str}") raise e from e - except Exception as e: - exc_str = parse_error(e) - logger.error(f"repositories.rocket.get_rocket: {exc_str}") - raise Exception(f"Error getting rocket: {exc_str}") from e + except RepositoryNotInitializedException as e: + raise e from e else: return self finally: @@ -124,13 +119,9 @@ async def delete_rocket_by_id(self, rocket_id: str): try: await self.delete_rocket(rocket_id) except PyMongoError as e: - exc_str = parse_error(e) - logger.error(f"repositories.rocket.delete_rocket: {exc_str}") raise e from e - except Exception as e: - exc_str = parse_error(e) - logger.error(f"repositories.rocket.delete_rocket: {exc_str}") - raise Exception(f"Error deleting rocket: {exc_str}") from e + except RepositoryNotInitializedException as e: + raise e from e else: return self finally: diff --git a/pyproject.toml b/pyproject.toml index ea7f9e5..190673c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,7 +49,6 @@ disable = """ too-many-public-methods, too-many-instance-attributes, logging-fstring-interpolation, - broad-exception-raised, import-error, protected-access, unnecessary-dunder-call,