From d36c7f6c9c9afe0232bfd24e306da8fb259f836f Mon Sep 17 00:00:00 2001 From: Lucas Guillermou Date: Wed, 11 Dec 2024 20:09:18 +0100 Subject: [PATCH 1/2] Add lock on object creation --- backend/infrahub/core/schema/schema_branch.py | 39 ++++++++++ backend/infrahub/graphql/mutations/ipam.py | 76 +++++++++++-------- backend/infrahub/graphql/mutations/main.py | 62 ++++++++++++--- backend/infrahub/lock.py | 13 +++- backend/tests/functional/ipam/__init__.py | 0 .../ipam/test_load_concurrent_prefixes.py | 33 ++++++++ .../tests/unit/core/test_get_kinds_lock.py | 25 ++++++ pyproject.toml | 4 + 8 files changed, 207 insertions(+), 45 deletions(-) create mode 100644 backend/tests/functional/ipam/__init__.py create mode 100644 backend/tests/functional/ipam/test_load_concurrent_prefixes.py create mode 100644 backend/tests/unit/core/test_get_kinds_lock.py diff --git a/backend/infrahub/core/schema/schema_branch.py b/backend/infrahub/core/schema/schema_branch.py index 79bc16bef6..db66100341 100644 --- a/backend/infrahub/core/schema/schema_branch.py +++ b/backend/infrahub/core/schema/schema_branch.py @@ -53,6 +53,7 @@ from infrahub.utils import format_label from infrahub.visuals import select_color +from ...lock import build_object_lock_name from .constants import INTERNAL_SCHEMA_NODE_KINDS, SchemaNamespace from .schema_branch_computed import ComputedAttributes @@ -1725,3 +1726,41 @@ def generate_profile_from_node(self, node: NodeSchema) -> ProfileSchema: profile.attributes.append(attr) return profile + + def _get_kinds_to_lock_on_object_mutation(self, kind: str) -> list[str]: + """ + Return kinds for which we want to lock during creating / updating an object of a given schema node. + Lock should be performed on schema kind and its generics having a uniqueness_constraint defined. + If a generic uniqueness constraint is the same as the node schema one, + it means node schema overrided this constraint, in which case we only need to lock on the generic. + """ + + node_schema = self.get(name=kind) + + schema_uc = None + kinds = [] + if node_schema.uniqueness_constraints: + kinds.append(node_schema.kind) + schema_uc = node_schema.uniqueness_constraints + + if node_schema.is_generic_schema: + return kinds + + generics_kinds = node_schema.inherit_from + + node_schema_kind_removed = False + for generic_kind in generics_kinds: + generic_uc = self.get(name=generic_kind).uniqueness_constraints + if generic_uc: + kinds.append(generic_kind) + if not node_schema_kind_removed and generic_uc == schema_uc: + # Check whether we should remove original schema kind as it simply overrides uniqueness_constraint + # of a generic + kinds.pop(0) + node_schema_kind_removed = True + return kinds + + def get_kind_lock_names_on_object_mutation(self, kind: str) -> list[str]: + lock_kinds = self._get_kinds_to_lock_on_object_mutation(kind) + lock_names = [build_object_lock_name(kind) for kind in lock_kinds] + return lock_names diff --git a/backend/infrahub/graphql/mutations/ipam.py b/backend/infrahub/graphql/mutations/ipam.py index 3c9472cf9b..cfb03b0cd9 100644 --- a/backend/infrahub/graphql/mutations/ipam.py +++ b/backend/infrahub/graphql/mutations/ipam.py @@ -17,6 +17,8 @@ from infrahub.graphql.mutations.node_getter.interface import MutationNodeGetterInterface from infrahub.log import get_logger +from ... import lock +from ...lock import InfrahubMultiLock, build_object_lock_name from .main import InfrahubMutationMixin, InfrahubMutationOptions if TYPE_CHECKING: @@ -106,12 +108,14 @@ async def mutate_create( ip_address = ipaddress.ip_interface(data["address"]["value"]) namespace_id = await validate_namespace(db=db, data=data) - async with db.start_transaction() as dbt: - address = await cls.mutate_create_object(data=data, db=dbt, branch=branch) - reconciler = IpamReconciler(db=dbt, branch=branch) - reconciled_address = await reconciler.reconcile( - ip_value=ip_address, namespace=namespace_id, node_uuid=address.get_id() - ) + lock_name = build_object_lock_name(cls._meta.schema.kind) + async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]): + async with db.start_transaction() as dbt: + address = await cls.mutate_create_object(data=data, db=dbt, branch=branch) + reconciler = IpamReconciler(db=dbt, branch=branch) + reconciled_address = await reconciler.reconcile( + ip_value=ip_address, namespace=namespace_id, node_uuid=address.get_id() + ) result = await cls.mutate_create_to_graphql(info=info, db=db, obj=reconciled_address) @@ -141,13 +145,15 @@ async def mutate_update( namespace = await address.ip_namespace.get_peer(db) namespace_id = await validate_namespace(db=db, data=data, existing_namespace_id=namespace.id) try: - async with db.start_transaction() as dbt: - address = await cls.mutate_update_object(db=dbt, info=info, data=data, branch=branch, obj=address) - reconciler = IpamReconciler(db=dbt, branch=branch) - ip_address = ipaddress.ip_interface(address.address.value) - reconciled_address = await reconciler.reconcile( - ip_value=ip_address, node_uuid=address.get_id(), namespace=namespace_id - ) + lock_name = build_object_lock_name(cls._meta.schema.kind) + async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]): + async with db.start_transaction() as dbt: + address = await cls.mutate_update_object(db=dbt, info=info, data=data, branch=branch, obj=address) + reconciler = IpamReconciler(db=dbt, branch=branch) + ip_address = ipaddress.ip_interface(address.address.value) + reconciled_address = await reconciler.reconcile( + ip_value=ip_address, node_uuid=address.get_id(), namespace=namespace_id + ) result = await cls.mutate_update_to_graphql(db=dbt, info=info, obj=reconciled_address) except ValidationError as exc: @@ -216,12 +222,14 @@ async def mutate_create( ip_network = ipaddress.ip_network(data["prefix"]["value"]) namespace_id = await validate_namespace(db=db, data=data) - async with db.start_transaction() as dbt: - prefix = await cls.mutate_create_object(data=data, db=dbt, branch=branch) - reconciler = IpamReconciler(db=dbt, branch=branch) - reconciled_prefix = await reconciler.reconcile( - ip_value=ip_network, namespace=namespace_id, node_uuid=prefix.get_id() - ) + lock_name = build_object_lock_name(cls._meta.schema.kind) + async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]): + async with db.start_transaction() as dbt: + prefix = await cls.mutate_create_object(data=data, db=dbt, branch=branch) + reconciler = IpamReconciler(db=dbt, branch=branch) + reconciled_prefix = await reconciler.reconcile( + ip_value=ip_network, namespace=namespace_id, node_uuid=prefix.get_id() + ) result = await cls.mutate_create_to_graphql(info=info, db=db, obj=reconciled_prefix) @@ -251,13 +259,15 @@ async def mutate_update( namespace = await prefix.ip_namespace.get_peer(db) namespace_id = await validate_namespace(db=db, data=data, existing_namespace_id=namespace.id) try: - async with db.start_transaction() as dbt: - prefix = await cls.mutate_update_object(db=dbt, info=info, data=data, branch=branch, obj=prefix) - reconciler = IpamReconciler(db=dbt, branch=branch) - ip_network = ipaddress.ip_network(prefix.prefix.value) - reconciled_prefix = await reconciler.reconcile( - ip_value=ip_network, node_uuid=prefix.get_id(), namespace=namespace_id - ) + lock_name = build_object_lock_name(cls._meta.schema.kind) + async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]): + async with db.start_transaction() as dbt: + prefix = await cls.mutate_update_object(db=dbt, info=info, data=data, branch=branch, obj=prefix) + reconciler = IpamReconciler(db=dbt, branch=branch) + ip_network = ipaddress.ip_network(prefix.prefix.value) + reconciled_prefix = await reconciler.reconcile( + ip_value=ip_network, node_uuid=prefix.get_id(), namespace=namespace_id + ) result = await cls.mutate_update_to_graphql(db=dbt, info=info, obj=reconciled_prefix) except ValidationError as exc: @@ -302,12 +312,14 @@ async def mutate_delete( namespace_rels = await prefix.ip_namespace.get_relationships(db=db) namespace_id = namespace_rels[0].peer_id try: - async with context.db.start_transaction() as dbt: - reconciler = IpamReconciler(db=dbt, branch=branch) - ip_network = ipaddress.ip_network(prefix.prefix.value) - reconciled_prefix = await reconciler.reconcile( - ip_value=ip_network, node_uuid=prefix.get_id(), namespace=namespace_id, is_delete=True - ) + lock_name = build_object_lock_name(cls._meta.schema.kind) + async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]): + async with context.db.start_transaction() as dbt: + reconciler = IpamReconciler(db=dbt, branch=branch) + ip_network = ipaddress.ip_network(prefix.prefix.value) + reconciled_prefix = await reconciler.reconcile( + ip_value=ip_network, node_uuid=prefix.get_id(), namespace=namespace_id, is_delete=True + ) except ValidationError as exc: raise ValueError(str(exc)) from exc diff --git a/backend/infrahub/graphql/mutations/main.py b/backend/infrahub/graphql/mutations/main.py index dac437fce2..1b17ec8029 100644 --- a/backend/infrahub/graphql/mutations/main.py +++ b/backend/infrahub/graphql/mutations/main.py @@ -7,7 +7,7 @@ from infrahub_sdk.utils import extract_fields from typing_extensions import Self -from infrahub import config +from infrahub import config, lock from infrahub.auth import validate_mutation_permissions_update_node from infrahub.core import registry from infrahub.core.constants import MutationAction @@ -25,6 +25,7 @@ from infrahub.log import get_log_data, get_logger from infrahub.worker import WORKER_IDENTITY +from ...lock import InfrahubMultiLock from .node_getter.by_default_filter import MutationNodeGetterByDefaultFilter from .node_getter.by_hfid import MutationNodeGetterByHfid from .node_getter.by_id import MutationNodeGetterById @@ -134,6 +135,20 @@ async def _refresh_for_profile_update( ) return obj + @classmethod + async def _call_mutate_create_object(cls, data: InputObjectType, db: InfrahubDatabase, branch: Branch): + """ + Wrapper around mutate_create_object to potentially activate locking. + """ + lock_names = registry.schema.get_schema_branch(name=branch.name).get_kind_lock_names_on_object_mutation( + cls._meta.schema.kind + ) + if lock_names: + async with InfrahubMultiLock(lock_registry=lock.registry, locks=lock_names): + return await cls.mutate_create_object(data=data, db=db, branch=branch) + + return await cls.mutate_create_object(data=data, db=db, branch=branch) + @classmethod async def mutate_create( cls, @@ -144,7 +159,7 @@ async def mutate_create( ) -> tuple[Node, Self]: context: GraphqlContext = info.context db = database or context.db - obj = await cls.mutate_create_object(data=data, db=db, branch=branch) + obj = await cls._call_mutate_create_object(data=data, db=db, branch=branch) result = await cls.mutate_create_to_graphql(info=info, db=db, obj=obj) return obj, result @@ -189,6 +204,41 @@ async def mutate_create_to_graphql(cls, info: GraphQLResolveInfo, db: InfrahubDa result["object"] = await obj.to_graphql(db=db, fields=fields.get("object", {})) return cls(**result) + @classmethod + async def _call_mutate_update( + cls, + info: GraphQLResolveInfo, + data: InputObjectType, + branch: Branch, + db: InfrahubDatabase, + obj: Node, + ) -> tuple[Node, Self]: + """ + Wrapper around mutate_update to potentially activate locking and call it within a database transaction. + """ + + lock_names = registry.schema.get_schema_branch(name=branch.name).get_kind_lock_names_on_object_mutation( + cls._meta.schema.kind + ) + + if db.is_transaction: + if lock_names: + async with InfrahubMultiLock(lock_registry=lock.registry, locks=lock_names): + obj = await cls.mutate_update_object(db=db, info=info, data=data, branch=branch, obj=obj) + else: + obj = await cls.mutate_update_object(db=db, info=info, data=data, branch=branch, obj=obj) + result = await cls.mutate_update_to_graphql(db=db, info=info, obj=obj) + return obj, result + + async with db.start_transaction() as dbt: + if lock_names: + async with InfrahubMultiLock(lock_registry=lock.registry, locks=lock_names): + obj = await cls.mutate_update_object(db=dbt, info=info, data=data, branch=branch, obj=obj) + else: + obj = await cls.mutate_update_object(db=dbt, info=info, data=data, branch=branch, obj=obj) + result = await cls.mutate_update_to_graphql(db=dbt, info=info, obj=obj) + return obj, result + @classmethod @retry_db_transaction(name="object_update") async def mutate_update( @@ -207,13 +257,7 @@ async def mutate_update( ) try: - if db.is_transaction: - obj = await cls.mutate_update_object(db=db, info=info, data=data, branch=branch, obj=obj) - result = await cls.mutate_update_to_graphql(db=db, info=info, obj=obj) - else: - async with db.start_transaction() as dbt: - obj = await cls.mutate_update_object(db=dbt, info=info, data=data, branch=branch, obj=obj) - result = await cls.mutate_update_to_graphql(db=dbt, info=info, obj=obj) + obj, result = await cls._call_mutate_update(info=info, data=data, db=db, branch=branch, obj=obj) except ValidationError as exc: raise ValueError(str(exc)) from exc diff --git a/backend/infrahub/lock.py b/backend/infrahub/lock.py index 5ee28cb0eb..ae1237bf13 100644 --- a/backend/infrahub/lock.py +++ b/backend/infrahub/lock.py @@ -18,6 +18,7 @@ from infrahub.services import InfrahubServices + registry: InfrahubLockRegistry = None @@ -46,8 +47,8 @@ class InfrahubMultiLock: """Context manager to allow multiple locks to be reserved together""" - def __init__(self, _registry: InfrahubLockRegistry, locks: Optional[list[str]] = None) -> None: - self.registry = _registry + def __init__(self, lock_registry: InfrahubLockRegistry, locks: Optional[list[str]] = None) -> None: + self.registry = lock_registry self.locks = locks or [] async def __aenter__(self): @@ -245,12 +246,16 @@ async def local_schema_wait(self) -> None: await self.get(name=LOCAL_SCHEMA_LOCK).event.wait() def global_schema_lock(self) -> InfrahubMultiLock: - return InfrahubMultiLock(_registry=self, locks=[LOCAL_SCHEMA_LOCK, GLOBAL_SCHEMA_LOCK]) + return InfrahubMultiLock(lock_registry=self, locks=[LOCAL_SCHEMA_LOCK, GLOBAL_SCHEMA_LOCK]) def global_graph_lock(self) -> InfrahubMultiLock: - return InfrahubMultiLock(_registry=self, locks=[LOCAL_SCHEMA_LOCK, GLOBAL_GRAPH_LOCK, GLOBAL_SCHEMA_LOCK]) + return InfrahubMultiLock(lock_registry=self, locks=[LOCAL_SCHEMA_LOCK, GLOBAL_GRAPH_LOCK, GLOBAL_SCHEMA_LOCK]) def initialize_lock(local_only: bool = False, service: Optional[InfrahubServices] = None) -> None: global registry # pylint: disable=global-statement registry = InfrahubLockRegistry(local_only=local_only, service=service) + + +def build_object_lock_name(name: str) -> str: + return f"global.object.{name}" diff --git a/backend/tests/functional/ipam/__init__.py b/backend/tests/functional/ipam/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/backend/tests/functional/ipam/test_load_concurrent_prefixes.py b/backend/tests/functional/ipam/test_load_concurrent_prefixes.py new file mode 100644 index 0000000000..d7fe59327e --- /dev/null +++ b/backend/tests/functional/ipam/test_load_concurrent_prefixes.py @@ -0,0 +1,33 @@ +import ipaddress + +from infrahub.database import InfrahubDatabase +from tests.integration.ipam.base import TestIpam + + +# See https://github.com/opsmill/infrahub/issues/4523 +class TestLoadConcurrentPrefixes(TestIpam): + async def test_load_concurrent_prefixes( + self, + db: InfrahubDatabase, + default_branch, + client, + default_ipnamespace, + register_ipam_schema, + ): + prefixes_batch = await client.create_batch() + network_8 = ipaddress.IPv4Network("10.0.0.0/8") + networks_16 = list(network_8.subnets(new_prefix=16)) + + networks = [network_8] + networks_16[0:10] + + for network in networks: + prefix = await client.create("IpamIPPrefix", prefix=f"{network}") + prefixes_batch.add(task=prefix.save, node=prefix, allow_upsert=True) + + async for _, _ in prefixes_batch.execute(): + pass + + nodes = await client.all("IpamIPPrefix", prefetch_relationships=True, populate_store=True) + for n in nodes: + if n.prefix.value != network_8: + assert n.parent.peer.prefix.value == network_8 diff --git a/backend/tests/unit/core/test_get_kinds_lock.py b/backend/tests/unit/core/test_get_kinds_lock.py new file mode 100644 index 0000000000..11829b69d4 --- /dev/null +++ b/backend/tests/unit/core/test_get_kinds_lock.py @@ -0,0 +1,25 @@ +from infrahub.core import registry +from infrahub.database import InfrahubDatabase +from infrahub.lock import get_kinds_to_lock_on_object_mutation +from tests.helpers.test_app import TestInfrahubApp + + +class TestGetKindsLock(TestInfrahubApp): + async def test_get_kinds_lock( + self, + db: InfrahubDatabase, + default_branch, + register_core_models_schema, + client, + ): + # CoreCredential has no uniqueness_constraint, but generic CorePasswordCredential has one + core_password_credential_node = registry.schema.get("CorePasswordCredential") + assert get_kinds_to_lock_on_object_mutation(core_password_credential_node) == ["CoreCredential"] + + # 3 generics but only GenericAccount has a uniqueness_constraint + core_account_node = registry.schema.get("CoreAccount") + assert get_kinds_to_lock_on_object_mutation(core_account_node) == ["CoreGenericAccount"] + + # No uniqueness_constraint, no generic + core_account_node = registry.schema.get("BuiltinIPPrefix") + assert get_kinds_to_lock_on_object_mutation(core_account_node) == [] diff --git a/pyproject.toml b/pyproject.toml index 20c58855f6..2a9655088b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -229,6 +229,10 @@ disallow_untyped_defs = false module = "tests.integration.*" disallow_untyped_defs = false +[[tool.mypy.overrides]] +module = "tests.functional.*" +disallow_untyped_defs = false + [[tool.mypy.overrides]] module = "infrahub.core.attribute" ignore_errors = true From 3e3f18731ade453f77e7f4a76f3898bc491e6d18 Mon Sep 17 00:00:00 2001 From: Lucas Guillermou Date: Fri, 13 Dec 2024 11:49:13 +0100 Subject: [PATCH 2/2] Only lock on main branch + refactoring --- backend/infrahub/core/schema/schema_branch.py | 7 +- backend/infrahub/graphql/mutations/ipam.py | 187 +++++++++++++----- backend/infrahub/graphql/mutations/main.py | 8 +- .../{integration => functional}/ipam/base.py | 0 .../ipam/conftest.py | 0 .../ipam/test_ipam_merge_reconcile.py | 0 .../ipam/test_ipam_rebase_reconcile.py | 0 .../ipam/test_ipam_utilization.py | 2 +- .../ipam/test_load_concurrent_prefixes.py | 3 +- .../ipam/test_proposed_change_reconcile.py | 0 backend/tests/integration/ipam/__init__.py | 0 .../tests/unit/core/test_get_kinds_lock.py | 13 +- 12 files changed, 162 insertions(+), 58 deletions(-) rename backend/tests/{integration => functional}/ipam/base.py (100%) rename backend/tests/{integration => functional}/ipam/conftest.py (100%) rename backend/tests/{integration => functional}/ipam/test_ipam_merge_reconcile.py (100%) rename backend/tests/{integration => functional}/ipam/test_ipam_rebase_reconcile.py (100%) rename backend/tests/{integration => functional}/ipam/test_ipam_utilization.py (99%) rename backend/tests/{integration => functional}/ipam/test_proposed_change_reconcile.py (100%) delete mode 100644 backend/tests/integration/ipam/__init__.py diff --git a/backend/infrahub/core/schema/schema_branch.py b/backend/infrahub/core/schema/schema_branch.py index db66100341..0674ddf058 100644 --- a/backend/infrahub/core/schema/schema_branch.py +++ b/backend/infrahub/core/schema/schema_branch.py @@ -62,6 +62,8 @@ if TYPE_CHECKING: from pydantic import ValidationInfo + from ..branch import Branch + # pylint: disable=redefined-builtin,too-many-public-methods,too-many-lines @@ -1760,7 +1762,10 @@ def _get_kinds_to_lock_on_object_mutation(self, kind: str) -> list[str]: node_schema_kind_removed = True return kinds - def get_kind_lock_names_on_object_mutation(self, kind: str) -> list[str]: + def get_kind_lock_names_on_object_mutation(self, kind: str, branch: Branch) -> list[str]: + if not branch.is_default: + # Do not lock on other branches as objects validations will be performed at least when merging in main branch. + return [] lock_kinds = self._get_kinds_to_lock_on_object_mutation(kind) lock_names = [build_object_lock_name(kind) for kind in lock_kinds] return lock_names diff --git a/backend/infrahub/graphql/mutations/ipam.py b/backend/infrahub/graphql/mutations/ipam.py index cfb03b0cd9..73b50c7c7f 100644 --- a/backend/infrahub/graphql/mutations/ipam.py +++ b/backend/infrahub/graphql/mutations/ipam.py @@ -1,10 +1,12 @@ import ipaddress +from ipaddress import IPv4Interface from typing import TYPE_CHECKING, Any, Optional from graphene import InputObjectType, Mutation from graphql import GraphQLResolveInfo from typing_extensions import Self +from infrahub import lock from infrahub.core import registry from infrahub.core.branch import Branch from infrahub.core.constants import InfrahubKind @@ -15,10 +17,9 @@ from infrahub.database import InfrahubDatabase, retry_db_transaction from infrahub.exceptions import NodeNotFoundError, ValidationError from infrahub.graphql.mutations.node_getter.interface import MutationNodeGetterInterface +from infrahub.lock import InfrahubMultiLock, build_object_lock_name from infrahub.log import get_logger -from ... import lock -from ...lock import InfrahubMultiLock, build_object_lock_name from .main import InfrahubMutationMixin, InfrahubMutationOptions if TYPE_CHECKING: @@ -94,6 +95,29 @@ def __init_subclass_with_meta__( # pylint: disable=arguments-differ super().__init_subclass_with_meta__(_meta=_meta, **options) + @staticmethod + def _get_lock_name(namespace_id: str, branch: Branch) -> str | None: + if not branch.is_default: + # Do not lock on other branches as reconciliation will be performed at least when merging in main branch. + return None + return build_object_lock_name(InfrahubKind.IPADDRESS + "_" + namespace_id) + + @classmethod + async def _mutate_create_object_and_reconcile( + cls, + data: InputObjectType, + branch: Branch, + db: InfrahubDatabase, + ip_address: IPv4Interface | ipaddress.IPv6Interface, + namespace_id: str, + ) -> Node: + address = await cls.mutate_create_object(data=data, db=db, branch=branch) + reconciler = IpamReconciler(db=db, branch=branch) + reconciled_address = await reconciler.reconcile( + ip_value=ip_address, namespace=namespace_id, node_uuid=address.get_id() + ) + return reconciled_address + @classmethod @retry_db_transaction(name="ipaddress_create") async def mutate_create( @@ -108,19 +132,38 @@ async def mutate_create( ip_address = ipaddress.ip_interface(data["address"]["value"]) namespace_id = await validate_namespace(db=db, data=data) - lock_name = build_object_lock_name(cls._meta.schema.kind) - async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]): - async with db.start_transaction() as dbt: - address = await cls.mutate_create_object(data=data, db=dbt, branch=branch) - reconciler = IpamReconciler(db=dbt, branch=branch) - reconciled_address = await reconciler.reconcile( - ip_value=ip_address, namespace=namespace_id, node_uuid=address.get_id() + async with db.start_transaction() as dbt: + if lock_name := cls._get_lock_name(namespace_id, branch): + async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]): + reconciled_address = await cls._mutate_create_object_and_reconcile( + data=data, branch=branch, db=dbt, ip_address=ip_address, namespace_id=namespace_id + ) + else: + reconciled_address = await cls._mutate_create_object_and_reconcile( + data=data, branch=branch, db=dbt, ip_address=ip_address, namespace_id=namespace_id ) - - result = await cls.mutate_create_to_graphql(info=info, db=db, obj=reconciled_address) + result = await cls.mutate_create_to_graphql(info=info, db=dbt, obj=reconciled_address) return reconciled_address, result + @classmethod + async def _mutate_update_object_and_reconcile( + cls, + info: GraphQLResolveInfo, + data: InputObjectType, + branch: Branch, + db: InfrahubDatabase, + address: Node, + namespace_id: str, + ) -> Node: + address = await cls.mutate_update_object(db=db, info=info, data=data, branch=branch, obj=address) + reconciler = IpamReconciler(db=db, branch=branch) + ip_address = ipaddress.ip_interface(address.address.value) + reconciled_address = await reconciler.reconcile( + ip_value=ip_address, node_uuid=address.get_id(), namespace=namespace_id + ) + return reconciled_address + @classmethod @retry_db_transaction(name="ipaddress_update") async def mutate_update( @@ -145,14 +188,15 @@ async def mutate_update( namespace = await address.ip_namespace.get_peer(db) namespace_id = await validate_namespace(db=db, data=data, existing_namespace_id=namespace.id) try: - lock_name = build_object_lock_name(cls._meta.schema.kind) - async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]): - async with db.start_transaction() as dbt: - address = await cls.mutate_update_object(db=dbt, info=info, data=data, branch=branch, obj=address) - reconciler = IpamReconciler(db=dbt, branch=branch) - ip_address = ipaddress.ip_interface(address.address.value) - reconciled_address = await reconciler.reconcile( - ip_value=ip_address, node_uuid=address.get_id(), namespace=namespace_id + async with db.start_transaction() as dbt: + if lock_name := cls._get_lock_name(namespace_id, branch): + async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]): + reconciled_address = await cls._mutate_update_object_and_reconcile( + info=info, data=data, branch=branch, address=address, namespace_id=namespace_id, db=dbt + ) + else: + reconciled_address = await cls._mutate_update_object_and_reconcile( + info=info, data=data, branch=branch, address=address, namespace_id=namespace_id, db=dbt ) result = await cls.mutate_update_to_graphql(db=dbt, info=info, obj=reconciled_address) @@ -208,6 +252,26 @@ def __init_subclass_with_meta__( # pylint: disable=arguments-differ super().__init_subclass_with_meta__(_meta=_meta, **options) + @staticmethod + def _get_lock_name(namespace_id: str, branch: Branch) -> str | None: + if not branch.is_default: + # Do not lock on other branches as reconciliation will be performed at least when merging in main branch. + return None + return build_object_lock_name(InfrahubKind.IPPREFIX + "_" + namespace_id) + + @classmethod + async def _mutate_create_object_and_reconcile( + cls, + data: InputObjectType, + branch: Branch, + db: InfrahubDatabase, + namespace_id: str, + ) -> Node: + prefix = await cls.mutate_create_object(data=data, db=db, branch=branch) + return await cls._reconcile_prefix( + branch=branch, db=db, prefix=prefix, namespace_id=namespace_id, is_delete=False + ) + @classmethod @retry_db_transaction(name="ipprefix_create") async def mutate_create( @@ -219,22 +283,38 @@ async def mutate_create( ) -> tuple[Node, Self]: context: GraphqlContext = info.context db = database or context.db - ip_network = ipaddress.ip_network(data["prefix"]["value"]) namespace_id = await validate_namespace(db=db, data=data) - lock_name = build_object_lock_name(cls._meta.schema.kind) - async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]): - async with db.start_transaction() as dbt: - prefix = await cls.mutate_create_object(data=data, db=dbt, branch=branch) - reconciler = IpamReconciler(db=dbt, branch=branch) - reconciled_prefix = await reconciler.reconcile( - ip_value=ip_network, namespace=namespace_id, node_uuid=prefix.get_id() + async with db.start_transaction() as dbt: + if lock_name := cls._get_lock_name(namespace_id, branch): + async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]): + reconciled_prefix = await cls._mutate_create_object_and_reconcile( + data=data, branch=branch, db=dbt, namespace_id=namespace_id + ) + else: + reconciled_prefix = await cls._mutate_create_object_and_reconcile( + data=data, branch=branch, db=dbt, namespace_id=namespace_id ) - result = await cls.mutate_create_to_graphql(info=info, db=db, obj=reconciled_prefix) + result = await cls.mutate_create_to_graphql(info=info, db=dbt, obj=reconciled_prefix) return reconciled_prefix, result + @classmethod + async def _mutate_update_object_and_reconcile( + cls, + info: GraphQLResolveInfo, + data: InputObjectType, + branch: Branch, + db: InfrahubDatabase, + prefix: Node, + namespace_id: str, + ) -> Node: + prefix = await cls.mutate_update_object(db=db, info=info, data=data, branch=branch, obj=prefix) + return await cls._reconcile_prefix( + branch=branch, db=db, prefix=prefix, namespace_id=namespace_id, is_delete=False + ) + @classmethod @retry_db_transaction(name="ipprefix_update") async def mutate_update( @@ -259,16 +339,16 @@ async def mutate_update( namespace = await prefix.ip_namespace.get_peer(db) namespace_id = await validate_namespace(db=db, data=data, existing_namespace_id=namespace.id) try: - lock_name = build_object_lock_name(cls._meta.schema.kind) - async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]): - async with db.start_transaction() as dbt: - prefix = await cls.mutate_update_object(db=dbt, info=info, data=data, branch=branch, obj=prefix) - reconciler = IpamReconciler(db=dbt, branch=branch) - ip_network = ipaddress.ip_network(prefix.prefix.value) - reconciled_prefix = await reconciler.reconcile( - ip_value=ip_network, node_uuid=prefix.get_id(), namespace=namespace_id + async with db.start_transaction() as dbt: + if lock_name := cls._get_lock_name(namespace_id, branch): + async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]): + reconciled_prefix = await cls._mutate_update_object_and_reconcile( + info=info, data=data, prefix=prefix, db=dbt, namespace_id=namespace_id, branch=branch + ) + else: + reconciled_prefix = await cls._mutate_update_object_and_reconcile( + info=info, data=data, prefix=prefix, db=dbt, namespace_id=namespace_id, branch=branch ) - result = await cls.mutate_update_to_graphql(db=dbt, info=info, obj=reconciled_prefix) except ValidationError as exc: raise ValueError(str(exc)) from exc @@ -294,6 +374,22 @@ async def mutate_upsert( return prefix, result, created + @classmethod + async def _reconcile_prefix( + cls, + branch: Branch, + db: InfrahubDatabase, + prefix: Node, + namespace_id: str, + is_delete: bool, + ) -> Node: + reconciler = IpamReconciler(db=db, branch=branch) + ip_network = ipaddress.ip_network(prefix.prefix.value) + reconciled_prefix = await reconciler.reconcile( + ip_value=ip_network, node_uuid=prefix.get_id(), namespace=namespace_id, is_delete=is_delete + ) + return reconciled_prefix + @classmethod @retry_db_transaction(name="ipprefix_delete") async def mutate_delete( @@ -312,14 +408,17 @@ async def mutate_delete( namespace_rels = await prefix.ip_namespace.get_relationships(db=db) namespace_id = namespace_rels[0].peer_id try: - lock_name = build_object_lock_name(cls._meta.schema.kind) - async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]): - async with context.db.start_transaction() as dbt: - reconciler = IpamReconciler(db=dbt, branch=branch) - ip_network = ipaddress.ip_network(prefix.prefix.value) - reconciled_prefix = await reconciler.reconcile( - ip_value=ip_network, node_uuid=prefix.get_id(), namespace=namespace_id, is_delete=True + async with context.db.start_transaction() as dbt: + if lock_name := cls._get_lock_name(namespace_id, branch): + async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]): + reconciled_prefix = await cls._reconcile_prefix( + branch=branch, db=dbt, prefix=prefix, namespace_id=namespace_id, is_delete=True + ) + else: + reconciled_prefix = await cls._reconcile_prefix( + branch=branch, db=dbt, prefix=prefix, namespace_id=namespace_id, is_delete=True ) + except ValidationError as exc: raise ValueError(str(exc)) from exc diff --git a/backend/infrahub/graphql/mutations/main.py b/backend/infrahub/graphql/mutations/main.py index 1b17ec8029..84f6a1d531 100644 --- a/backend/infrahub/graphql/mutations/main.py +++ b/backend/infrahub/graphql/mutations/main.py @@ -140,8 +140,8 @@ async def _call_mutate_create_object(cls, data: InputObjectType, db: InfrahubDat """ Wrapper around mutate_create_object to potentially activate locking. """ - lock_names = registry.schema.get_schema_branch(name=branch.name).get_kind_lock_names_on_object_mutation( - cls._meta.schema.kind + lock_names = db.schema.get_schema_branch(name=branch.name).get_kind_lock_names_on_object_mutation( + kind=cls._meta.schema.kind, branch=branch ) if lock_names: async with InfrahubMultiLock(lock_registry=lock.registry, locks=lock_names): @@ -217,8 +217,8 @@ async def _call_mutate_update( Wrapper around mutate_update to potentially activate locking and call it within a database transaction. """ - lock_names = registry.schema.get_schema_branch(name=branch.name).get_kind_lock_names_on_object_mutation( - cls._meta.schema.kind + lock_names = db.schema.get_schema_branch(name=branch.name).get_kind_lock_names_on_object_mutation( + kind=cls._meta.schema.kind, branch=branch ) if db.is_transaction: diff --git a/backend/tests/integration/ipam/base.py b/backend/tests/functional/ipam/base.py similarity index 100% rename from backend/tests/integration/ipam/base.py rename to backend/tests/functional/ipam/base.py diff --git a/backend/tests/integration/ipam/conftest.py b/backend/tests/functional/ipam/conftest.py similarity index 100% rename from backend/tests/integration/ipam/conftest.py rename to backend/tests/functional/ipam/conftest.py diff --git a/backend/tests/integration/ipam/test_ipam_merge_reconcile.py b/backend/tests/functional/ipam/test_ipam_merge_reconcile.py similarity index 100% rename from backend/tests/integration/ipam/test_ipam_merge_reconcile.py rename to backend/tests/functional/ipam/test_ipam_merge_reconcile.py diff --git a/backend/tests/integration/ipam/test_ipam_rebase_reconcile.py b/backend/tests/functional/ipam/test_ipam_rebase_reconcile.py similarity index 100% rename from backend/tests/integration/ipam/test_ipam_rebase_reconcile.py rename to backend/tests/functional/ipam/test_ipam_rebase_reconcile.py diff --git a/backend/tests/integration/ipam/test_ipam_utilization.py b/backend/tests/functional/ipam/test_ipam_utilization.py similarity index 99% rename from backend/tests/integration/ipam/test_ipam_utilization.py rename to backend/tests/functional/ipam/test_ipam_utilization.py index 7f404e9016..6536dddd48 100644 --- a/backend/tests/integration/ipam/test_ipam_utilization.py +++ b/backend/tests/functional/ipam/test_ipam_utilization.py @@ -10,8 +10,8 @@ from infrahub.core.manager import NodeManager from infrahub.core.node import Node from infrahub.graphql.initialization import prepare_graphql_params +from tests.functional.ipam.base import TestIpam from tests.helpers.graphql import graphql -from tests.integration.ipam.base import TestIpam if TYPE_CHECKING: from infrahub.core.branch import Branch diff --git a/backend/tests/functional/ipam/test_load_concurrent_prefixes.py b/backend/tests/functional/ipam/test_load_concurrent_prefixes.py index d7fe59327e..4111ee0af1 100644 --- a/backend/tests/functional/ipam/test_load_concurrent_prefixes.py +++ b/backend/tests/functional/ipam/test_load_concurrent_prefixes.py @@ -1,7 +1,7 @@ import ipaddress from infrahub.database import InfrahubDatabase -from tests.integration.ipam.base import TestIpam +from tests.functional.ipam.base import TestIpam # See https://github.com/opsmill/infrahub/issues/4523 @@ -30,4 +30,5 @@ async def test_load_concurrent_prefixes( nodes = await client.all("IpamIPPrefix", prefetch_relationships=True, populate_store=True) for n in nodes: if n.prefix.value != network_8: + # Without locking mechanism server side, parent might not be present assert n.parent.peer.prefix.value == network_8 diff --git a/backend/tests/integration/ipam/test_proposed_change_reconcile.py b/backend/tests/functional/ipam/test_proposed_change_reconcile.py similarity index 100% rename from backend/tests/integration/ipam/test_proposed_change_reconcile.py rename to backend/tests/functional/ipam/test_proposed_change_reconcile.py diff --git a/backend/tests/integration/ipam/__init__.py b/backend/tests/integration/ipam/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/backend/tests/unit/core/test_get_kinds_lock.py b/backend/tests/unit/core/test_get_kinds_lock.py index 11829b69d4..85e1de3f52 100644 --- a/backend/tests/unit/core/test_get_kinds_lock.py +++ b/backend/tests/unit/core/test_get_kinds_lock.py @@ -1,6 +1,5 @@ from infrahub.core import registry from infrahub.database import InfrahubDatabase -from infrahub.lock import get_kinds_to_lock_on_object_mutation from tests.helpers.test_app import TestInfrahubApp @@ -13,13 +12,13 @@ async def test_get_kinds_lock( client, ): # CoreCredential has no uniqueness_constraint, but generic CorePasswordCredential has one - core_password_credential_node = registry.schema.get("CorePasswordCredential") - assert get_kinds_to_lock_on_object_mutation(core_password_credential_node) == ["CoreCredential"] + schema_branch = registry.schema.get_schema_branch(name=default_branch.name) + assert schema_branch._get_kinds_to_lock_on_object_mutation("CorePasswordCredential") == ["CoreCredential"] # 3 generics but only GenericAccount has a uniqueness_constraint - core_account_node = registry.schema.get("CoreAccount") - assert get_kinds_to_lock_on_object_mutation(core_account_node) == ["CoreGenericAccount"] + schema_branch = registry.schema.get_schema_branch(name=default_branch.name) + assert schema_branch._get_kinds_to_lock_on_object_mutation("CoreAccount") == ["CoreGenericAccount"] # No uniqueness_constraint, no generic - core_account_node = registry.schema.get("BuiltinIPPrefix") - assert get_kinds_to_lock_on_object_mutation(core_account_node) == [] + schema_branch = registry.schema.get_schema_branch(name=default_branch.name) + assert schema_branch._get_kinds_to_lock_on_object_mutation("BuiltinIPPrefix") == []