Skip to content

Commit

Permalink
Only lock on main branch + refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
LucasG0 committed Dec 13, 2024
1 parent d36c7f6 commit 3e3f187
Show file tree
Hide file tree
Showing 12 changed files with 162 additions and 58 deletions.
7 changes: 6 additions & 1 deletion backend/infrahub/core/schema/schema_branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
if TYPE_CHECKING:
from pydantic import ValidationInfo

from ..branch import Branch

# pylint: disable=redefined-builtin,too-many-public-methods,too-many-lines


Expand Down Expand Up @@ -1760,7 +1762,10 @@ def _get_kinds_to_lock_on_object_mutation(self, kind: str) -> list[str]:
node_schema_kind_removed = True
return kinds

def get_kind_lock_names_on_object_mutation(self, kind: str) -> list[str]:
def get_kind_lock_names_on_object_mutation(self, kind: str, branch: Branch) -> list[str]:
if not branch.is_default:
# Do not lock on other branches as objects validations will be performed at least when merging in main branch.
return []
lock_kinds = self._get_kinds_to_lock_on_object_mutation(kind)
lock_names = [build_object_lock_name(kind) for kind in lock_kinds]
return lock_names
187 changes: 143 additions & 44 deletions backend/infrahub/graphql/mutations/ipam.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import ipaddress
from ipaddress import IPv4Interface
from typing import TYPE_CHECKING, Any, Optional

from graphene import InputObjectType, Mutation
from graphql import GraphQLResolveInfo
from typing_extensions import Self

from infrahub import lock
from infrahub.core import registry
from infrahub.core.branch import Branch
from infrahub.core.constants import InfrahubKind
Expand All @@ -15,10 +17,9 @@
from infrahub.database import InfrahubDatabase, retry_db_transaction
from infrahub.exceptions import NodeNotFoundError, ValidationError
from infrahub.graphql.mutations.node_getter.interface import MutationNodeGetterInterface
from infrahub.lock import InfrahubMultiLock, build_object_lock_name
from infrahub.log import get_logger

from ... import lock
from ...lock import InfrahubMultiLock, build_object_lock_name
from .main import InfrahubMutationMixin, InfrahubMutationOptions

if TYPE_CHECKING:
Expand Down Expand Up @@ -94,6 +95,29 @@ def __init_subclass_with_meta__( # pylint: disable=arguments-differ

super().__init_subclass_with_meta__(_meta=_meta, **options)

@staticmethod
def _get_lock_name(namespace_id: str, branch: Branch) -> str | None:
if not branch.is_default:
# Do not lock on other branches as reconciliation will be performed at least when merging in main branch.
return None
return build_object_lock_name(InfrahubKind.IPADDRESS + "_" + namespace_id)

@classmethod
async def _mutate_create_object_and_reconcile(
cls,
data: InputObjectType,
branch: Branch,
db: InfrahubDatabase,
ip_address: IPv4Interface | ipaddress.IPv6Interface,
namespace_id: str,
) -> Node:
address = await cls.mutate_create_object(data=data, db=db, branch=branch)
reconciler = IpamReconciler(db=db, branch=branch)
reconciled_address = await reconciler.reconcile(
ip_value=ip_address, namespace=namespace_id, node_uuid=address.get_id()
)
return reconciled_address

@classmethod
@retry_db_transaction(name="ipaddress_create")
async def mutate_create(
Expand All @@ -108,19 +132,38 @@ async def mutate_create(
ip_address = ipaddress.ip_interface(data["address"]["value"])
namespace_id = await validate_namespace(db=db, data=data)

lock_name = build_object_lock_name(cls._meta.schema.kind)
async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]):
async with db.start_transaction() as dbt:
address = await cls.mutate_create_object(data=data, db=dbt, branch=branch)
reconciler = IpamReconciler(db=dbt, branch=branch)
reconciled_address = await reconciler.reconcile(
ip_value=ip_address, namespace=namespace_id, node_uuid=address.get_id()
async with db.start_transaction() as dbt:
if lock_name := cls._get_lock_name(namespace_id, branch):
async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]):
reconciled_address = await cls._mutate_create_object_and_reconcile(
data=data, branch=branch, db=dbt, ip_address=ip_address, namespace_id=namespace_id
)
else:
reconciled_address = await cls._mutate_create_object_and_reconcile(
data=data, branch=branch, db=dbt, ip_address=ip_address, namespace_id=namespace_id
)

result = await cls.mutate_create_to_graphql(info=info, db=db, obj=reconciled_address)
result = await cls.mutate_create_to_graphql(info=info, db=dbt, obj=reconciled_address)

return reconciled_address, result

@classmethod
async def _mutate_update_object_and_reconcile(
cls,
info: GraphQLResolveInfo,
data: InputObjectType,
branch: Branch,
db: InfrahubDatabase,
address: Node,
namespace_id: str,
) -> Node:
address = await cls.mutate_update_object(db=db, info=info, data=data, branch=branch, obj=address)
reconciler = IpamReconciler(db=db, branch=branch)
ip_address = ipaddress.ip_interface(address.address.value)
reconciled_address = await reconciler.reconcile(
ip_value=ip_address, node_uuid=address.get_id(), namespace=namespace_id
)
return reconciled_address

@classmethod
@retry_db_transaction(name="ipaddress_update")
async def mutate_update(
Expand All @@ -145,14 +188,15 @@ async def mutate_update(
namespace = await address.ip_namespace.get_peer(db)
namespace_id = await validate_namespace(db=db, data=data, existing_namespace_id=namespace.id)
try:
lock_name = build_object_lock_name(cls._meta.schema.kind)
async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]):
async with db.start_transaction() as dbt:
address = await cls.mutate_update_object(db=dbt, info=info, data=data, branch=branch, obj=address)
reconciler = IpamReconciler(db=dbt, branch=branch)
ip_address = ipaddress.ip_interface(address.address.value)
reconciled_address = await reconciler.reconcile(
ip_value=ip_address, node_uuid=address.get_id(), namespace=namespace_id
async with db.start_transaction() as dbt:
if lock_name := cls._get_lock_name(namespace_id, branch):
async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]):
reconciled_address = await cls._mutate_update_object_and_reconcile(
info=info, data=data, branch=branch, address=address, namespace_id=namespace_id, db=dbt
)
else:
reconciled_address = await cls._mutate_update_object_and_reconcile(
info=info, data=data, branch=branch, address=address, namespace_id=namespace_id, db=dbt
)

result = await cls.mutate_update_to_graphql(db=dbt, info=info, obj=reconciled_address)
Expand Down Expand Up @@ -208,6 +252,26 @@ def __init_subclass_with_meta__( # pylint: disable=arguments-differ

super().__init_subclass_with_meta__(_meta=_meta, **options)

@staticmethod
def _get_lock_name(namespace_id: str, branch: Branch) -> str | None:
if not branch.is_default:
# Do not lock on other branches as reconciliation will be performed at least when merging in main branch.
return None
return build_object_lock_name(InfrahubKind.IPPREFIX + "_" + namespace_id)

@classmethod
async def _mutate_create_object_and_reconcile(
cls,
data: InputObjectType,
branch: Branch,
db: InfrahubDatabase,
namespace_id: str,
) -> Node:
prefix = await cls.mutate_create_object(data=data, db=db, branch=branch)
return await cls._reconcile_prefix(
branch=branch, db=db, prefix=prefix, namespace_id=namespace_id, is_delete=False
)

@classmethod
@retry_db_transaction(name="ipprefix_create")
async def mutate_create(
Expand All @@ -219,22 +283,38 @@ async def mutate_create(
) -> tuple[Node, Self]:
context: GraphqlContext = info.context
db = database or context.db
ip_network = ipaddress.ip_network(data["prefix"]["value"])
namespace_id = await validate_namespace(db=db, data=data)

lock_name = build_object_lock_name(cls._meta.schema.kind)
async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]):
async with db.start_transaction() as dbt:
prefix = await cls.mutate_create_object(data=data, db=dbt, branch=branch)
reconciler = IpamReconciler(db=dbt, branch=branch)
reconciled_prefix = await reconciler.reconcile(
ip_value=ip_network, namespace=namespace_id, node_uuid=prefix.get_id()
async with db.start_transaction() as dbt:
if lock_name := cls._get_lock_name(namespace_id, branch):
async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]):
reconciled_prefix = await cls._mutate_create_object_and_reconcile(
data=data, branch=branch, db=dbt, namespace_id=namespace_id
)
else:
reconciled_prefix = await cls._mutate_create_object_and_reconcile(
data=data, branch=branch, db=dbt, namespace_id=namespace_id
)

result = await cls.mutate_create_to_graphql(info=info, db=db, obj=reconciled_prefix)
result = await cls.mutate_create_to_graphql(info=info, db=dbt, obj=reconciled_prefix)

return reconciled_prefix, result

@classmethod
async def _mutate_update_object_and_reconcile(
cls,
info: GraphQLResolveInfo,
data: InputObjectType,
branch: Branch,
db: InfrahubDatabase,
prefix: Node,
namespace_id: str,
) -> Node:
prefix = await cls.mutate_update_object(db=db, info=info, data=data, branch=branch, obj=prefix)
return await cls._reconcile_prefix(
branch=branch, db=db, prefix=prefix, namespace_id=namespace_id, is_delete=False
)

@classmethod
@retry_db_transaction(name="ipprefix_update")
async def mutate_update(
Expand All @@ -259,16 +339,16 @@ async def mutate_update(
namespace = await prefix.ip_namespace.get_peer(db)
namespace_id = await validate_namespace(db=db, data=data, existing_namespace_id=namespace.id)
try:
lock_name = build_object_lock_name(cls._meta.schema.kind)
async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]):
async with db.start_transaction() as dbt:
prefix = await cls.mutate_update_object(db=dbt, info=info, data=data, branch=branch, obj=prefix)
reconciler = IpamReconciler(db=dbt, branch=branch)
ip_network = ipaddress.ip_network(prefix.prefix.value)
reconciled_prefix = await reconciler.reconcile(
ip_value=ip_network, node_uuid=prefix.get_id(), namespace=namespace_id
async with db.start_transaction() as dbt:
if lock_name := cls._get_lock_name(namespace_id, branch):
async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]):
reconciled_prefix = await cls._mutate_update_object_and_reconcile(
info=info, data=data, prefix=prefix, db=dbt, namespace_id=namespace_id, branch=branch
)
else:
reconciled_prefix = await cls._mutate_update_object_and_reconcile(
info=info, data=data, prefix=prefix, db=dbt, namespace_id=namespace_id, branch=branch
)

result = await cls.mutate_update_to_graphql(db=dbt, info=info, obj=reconciled_prefix)
except ValidationError as exc:
raise ValueError(str(exc)) from exc
Expand All @@ -294,6 +374,22 @@ async def mutate_upsert(

return prefix, result, created

@classmethod
async def _reconcile_prefix(
cls,
branch: Branch,
db: InfrahubDatabase,
prefix: Node,
namespace_id: str,
is_delete: bool,
) -> Node:
reconciler = IpamReconciler(db=db, branch=branch)
ip_network = ipaddress.ip_network(prefix.prefix.value)
reconciled_prefix = await reconciler.reconcile(
ip_value=ip_network, node_uuid=prefix.get_id(), namespace=namespace_id, is_delete=is_delete
)
return reconciled_prefix

@classmethod
@retry_db_transaction(name="ipprefix_delete")
async def mutate_delete(
Expand All @@ -312,14 +408,17 @@ async def mutate_delete(
namespace_rels = await prefix.ip_namespace.get_relationships(db=db)
namespace_id = namespace_rels[0].peer_id
try:
lock_name = build_object_lock_name(cls._meta.schema.kind)
async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]):
async with context.db.start_transaction() as dbt:
reconciler = IpamReconciler(db=dbt, branch=branch)
ip_network = ipaddress.ip_network(prefix.prefix.value)
reconciled_prefix = await reconciler.reconcile(
ip_value=ip_network, node_uuid=prefix.get_id(), namespace=namespace_id, is_delete=True
async with context.db.start_transaction() as dbt:
if lock_name := cls._get_lock_name(namespace_id, branch):
async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]):
reconciled_prefix = await cls._reconcile_prefix(
branch=branch, db=dbt, prefix=prefix, namespace_id=namespace_id, is_delete=True
)
else:
reconciled_prefix = await cls._reconcile_prefix(
branch=branch, db=dbt, prefix=prefix, namespace_id=namespace_id, is_delete=True
)

except ValidationError as exc:
raise ValueError(str(exc)) from exc

Expand Down
8 changes: 4 additions & 4 deletions backend/infrahub/graphql/mutations/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ async def _call_mutate_create_object(cls, data: InputObjectType, db: InfrahubDat
"""
Wrapper around mutate_create_object to potentially activate locking.
"""
lock_names = registry.schema.get_schema_branch(name=branch.name).get_kind_lock_names_on_object_mutation(
cls._meta.schema.kind
lock_names = db.schema.get_schema_branch(name=branch.name).get_kind_lock_names_on_object_mutation(
kind=cls._meta.schema.kind, branch=branch
)
if lock_names:
async with InfrahubMultiLock(lock_registry=lock.registry, locks=lock_names):
Expand Down Expand Up @@ -217,8 +217,8 @@ async def _call_mutate_update(
Wrapper around mutate_update to potentially activate locking and call it within a database transaction.
"""

lock_names = registry.schema.get_schema_branch(name=branch.name).get_kind_lock_names_on_object_mutation(
cls._meta.schema.kind
lock_names = db.schema.get_schema_branch(name=branch.name).get_kind_lock_names_on_object_mutation(
kind=cls._meta.schema.kind, branch=branch
)

if db.is_transaction:
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from infrahub.core.manager import NodeManager
from infrahub.core.node import Node
from infrahub.graphql.initialization import prepare_graphql_params
from tests.functional.ipam.base import TestIpam
from tests.helpers.graphql import graphql
from tests.integration.ipam.base import TestIpam

if TYPE_CHECKING:
from infrahub.core.branch import Branch
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import ipaddress

from infrahub.database import InfrahubDatabase
from tests.integration.ipam.base import TestIpam
from tests.functional.ipam.base import TestIpam


# See https://github.com/opsmill/infrahub/issues/4523
Expand Down Expand Up @@ -30,4 +30,5 @@ async def test_load_concurrent_prefixes(
nodes = await client.all("IpamIPPrefix", prefetch_relationships=True, populate_store=True)
for n in nodes:
if n.prefix.value != network_8:
# Without locking mechanism server side, parent might not be present
assert n.parent.peer.prefix.value == network_8
Empty file.
13 changes: 6 additions & 7 deletions backend/tests/unit/core/test_get_kinds_lock.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from infrahub.core import registry
from infrahub.database import InfrahubDatabase
from infrahub.lock import get_kinds_to_lock_on_object_mutation
from tests.helpers.test_app import TestInfrahubApp


Expand All @@ -13,13 +12,13 @@ async def test_get_kinds_lock(
client,
):
# CoreCredential has no uniqueness_constraint, but generic CorePasswordCredential has one
core_password_credential_node = registry.schema.get("CorePasswordCredential")
assert get_kinds_to_lock_on_object_mutation(core_password_credential_node) == ["CoreCredential"]
schema_branch = registry.schema.get_schema_branch(name=default_branch.name)
assert schema_branch._get_kinds_to_lock_on_object_mutation("CorePasswordCredential") == ["CoreCredential"]

# 3 generics but only GenericAccount has a uniqueness_constraint
core_account_node = registry.schema.get("CoreAccount")
assert get_kinds_to_lock_on_object_mutation(core_account_node) == ["CoreGenericAccount"]
schema_branch = registry.schema.get_schema_branch(name=default_branch.name)
assert schema_branch._get_kinds_to_lock_on_object_mutation("CoreAccount") == ["CoreGenericAccount"]

# No uniqueness_constraint, no generic
core_account_node = registry.schema.get("BuiltinIPPrefix")
assert get_kinds_to_lock_on_object_mutation(core_account_node) == []
schema_branch = registry.schema.get_schema_branch(name=default_branch.name)
assert schema_branch._get_kinds_to_lock_on_object_mutation("BuiltinIPPrefix") == []

0 comments on commit 3e3f187

Please sign in to comment.