Skip to content

Commit

Permalink
Merge pull request #5194 from opsmill/lgu-add-lock
Browse files Browse the repository at this point in the history
Add lock on object creation
  • Loading branch information
LucasG0 authored Dec 13, 2024
2 parents f76fa3c + 3e3f187 commit 938522d
Show file tree
Hide file tree
Showing 14 changed files with 312 additions and 46 deletions.
44 changes: 44 additions & 0 deletions backend/infrahub/core/schema/schema_branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from infrahub.utils import format_label
from infrahub.visuals import select_color

from ...lock import build_object_lock_name
from .constants import INTERNAL_SCHEMA_NODE_KINDS, SchemaNamespace
from .schema_branch_computed import ComputedAttributes

Expand All @@ -61,6 +62,8 @@
if TYPE_CHECKING:
from pydantic import ValidationInfo

from ..branch import Branch

# pylint: disable=redefined-builtin,too-many-public-methods,too-many-lines


Expand Down Expand Up @@ -1725,3 +1728,44 @@ def generate_profile_from_node(self, node: NodeSchema) -> ProfileSchema:
profile.attributes.append(attr)

return profile

def _get_kinds_to_lock_on_object_mutation(self, kind: str) -> list[str]:
"""
Return kinds for which we want to lock during creating / updating an object of a given schema node.
Lock should be performed on schema kind and its generics having a uniqueness_constraint defined.
If a generic uniqueness constraint is the same as the node schema one,
it means node schema overrided this constraint, in which case we only need to lock on the generic.
"""

node_schema = self.get(name=kind)

schema_uc = None
kinds = []
if node_schema.uniqueness_constraints:
kinds.append(node_schema.kind)
schema_uc = node_schema.uniqueness_constraints

if node_schema.is_generic_schema:
return kinds

generics_kinds = node_schema.inherit_from

node_schema_kind_removed = False
for generic_kind in generics_kinds:
generic_uc = self.get(name=generic_kind).uniqueness_constraints
if generic_uc:
kinds.append(generic_kind)
if not node_schema_kind_removed and generic_uc == schema_uc:
# Check whether we should remove original schema kind as it simply overrides uniqueness_constraint
# of a generic
kinds.pop(0)
node_schema_kind_removed = True
return kinds

def get_kind_lock_names_on_object_mutation(self, kind: str, branch: Branch) -> list[str]:
if not branch.is_default:
# Do not lock on other branches as objects validations will be performed at least when merging in main branch.
return []
lock_kinds = self._get_kinds_to_lock_on_object_mutation(kind)
lock_names = [build_object_lock_name(kind) for kind in lock_kinds]
return lock_names
175 changes: 143 additions & 32 deletions backend/infrahub/graphql/mutations/ipam.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import ipaddress
from ipaddress import IPv4Interface
from typing import TYPE_CHECKING, Any, Optional

from graphene import InputObjectType, Mutation
from graphql import GraphQLResolveInfo
from typing_extensions import Self

from infrahub import lock
from infrahub.core import registry
from infrahub.core.branch import Branch
from infrahub.core.constants import InfrahubKind
Expand All @@ -15,6 +17,7 @@
from infrahub.database import InfrahubDatabase, retry_db_transaction
from infrahub.exceptions import NodeNotFoundError, ValidationError
from infrahub.graphql.mutations.node_getter.interface import MutationNodeGetterInterface
from infrahub.lock import InfrahubMultiLock, build_object_lock_name
from infrahub.log import get_logger

from .main import InfrahubMutationMixin, InfrahubMutationOptions
Expand Down Expand Up @@ -92,6 +95,29 @@ def __init_subclass_with_meta__( # pylint: disable=arguments-differ

super().__init_subclass_with_meta__(_meta=_meta, **options)

@staticmethod
def _get_lock_name(namespace_id: str, branch: Branch) -> str | None:
if not branch.is_default:
# Do not lock on other branches as reconciliation will be performed at least when merging in main branch.
return None
return build_object_lock_name(InfrahubKind.IPADDRESS + "_" + namespace_id)

@classmethod
async def _mutate_create_object_and_reconcile(
cls,
data: InputObjectType,
branch: Branch,
db: InfrahubDatabase,
ip_address: IPv4Interface | ipaddress.IPv6Interface,
namespace_id: str,
) -> Node:
address = await cls.mutate_create_object(data=data, db=db, branch=branch)
reconciler = IpamReconciler(db=db, branch=branch)
reconciled_address = await reconciler.reconcile(
ip_value=ip_address, namespace=namespace_id, node_uuid=address.get_id()
)
return reconciled_address

@classmethod
@retry_db_transaction(name="ipaddress_create")
async def mutate_create(
Expand All @@ -107,16 +133,37 @@ async def mutate_create(
namespace_id = await validate_namespace(db=db, data=data)

async with db.start_transaction() as dbt:
address = await cls.mutate_create_object(data=data, db=dbt, branch=branch)
reconciler = IpamReconciler(db=dbt, branch=branch)
reconciled_address = await reconciler.reconcile(
ip_value=ip_address, namespace=namespace_id, node_uuid=address.get_id()
)

result = await cls.mutate_create_to_graphql(info=info, db=db, obj=reconciled_address)
if lock_name := cls._get_lock_name(namespace_id, branch):
async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]):
reconciled_address = await cls._mutate_create_object_and_reconcile(
data=data, branch=branch, db=dbt, ip_address=ip_address, namespace_id=namespace_id
)
else:
reconciled_address = await cls._mutate_create_object_and_reconcile(
data=data, branch=branch, db=dbt, ip_address=ip_address, namespace_id=namespace_id
)
result = await cls.mutate_create_to_graphql(info=info, db=dbt, obj=reconciled_address)

return reconciled_address, result

@classmethod
async def _mutate_update_object_and_reconcile(
cls,
info: GraphQLResolveInfo,
data: InputObjectType,
branch: Branch,
db: InfrahubDatabase,
address: Node,
namespace_id: str,
) -> Node:
address = await cls.mutate_update_object(db=db, info=info, data=data, branch=branch, obj=address)
reconciler = IpamReconciler(db=db, branch=branch)
ip_address = ipaddress.ip_interface(address.address.value)
reconciled_address = await reconciler.reconcile(
ip_value=ip_address, node_uuid=address.get_id(), namespace=namespace_id
)
return reconciled_address

@classmethod
@retry_db_transaction(name="ipaddress_update")
async def mutate_update(
Expand All @@ -142,12 +189,15 @@ async def mutate_update(
namespace_id = await validate_namespace(db=db, data=data, existing_namespace_id=namespace.id)
try:
async with db.start_transaction() as dbt:
address = await cls.mutate_update_object(db=dbt, info=info, data=data, branch=branch, obj=address)
reconciler = IpamReconciler(db=dbt, branch=branch)
ip_address = ipaddress.ip_interface(address.address.value)
reconciled_address = await reconciler.reconcile(
ip_value=ip_address, node_uuid=address.get_id(), namespace=namespace_id
)
if lock_name := cls._get_lock_name(namespace_id, branch):
async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]):
reconciled_address = await cls._mutate_update_object_and_reconcile(
info=info, data=data, branch=branch, address=address, namespace_id=namespace_id, db=dbt
)
else:
reconciled_address = await cls._mutate_update_object_and_reconcile(
info=info, data=data, branch=branch, address=address, namespace_id=namespace_id, db=dbt
)

result = await cls.mutate_update_to_graphql(db=dbt, info=info, obj=reconciled_address)
except ValidationError as exc:
Expand Down Expand Up @@ -202,6 +252,26 @@ def __init_subclass_with_meta__( # pylint: disable=arguments-differ

super().__init_subclass_with_meta__(_meta=_meta, **options)

@staticmethod
def _get_lock_name(namespace_id: str, branch: Branch) -> str | None:
if not branch.is_default:
# Do not lock on other branches as reconciliation will be performed at least when merging in main branch.
return None
return build_object_lock_name(InfrahubKind.IPPREFIX + "_" + namespace_id)

@classmethod
async def _mutate_create_object_and_reconcile(
cls,
data: InputObjectType,
branch: Branch,
db: InfrahubDatabase,
namespace_id: str,
) -> Node:
prefix = await cls.mutate_create_object(data=data, db=db, branch=branch)
return await cls._reconcile_prefix(
branch=branch, db=db, prefix=prefix, namespace_id=namespace_id, is_delete=False
)

@classmethod
@retry_db_transaction(name="ipprefix_create")
async def mutate_create(
Expand All @@ -213,20 +283,38 @@ async def mutate_create(
) -> tuple[Node, Self]:
context: GraphqlContext = info.context
db = database or context.db
ip_network = ipaddress.ip_network(data["prefix"]["value"])
namespace_id = await validate_namespace(db=db, data=data)

async with db.start_transaction() as dbt:
prefix = await cls.mutate_create_object(data=data, db=dbt, branch=branch)
reconciler = IpamReconciler(db=dbt, branch=branch)
reconciled_prefix = await reconciler.reconcile(
ip_value=ip_network, namespace=namespace_id, node_uuid=prefix.get_id()
)
if lock_name := cls._get_lock_name(namespace_id, branch):
async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]):
reconciled_prefix = await cls._mutate_create_object_and_reconcile(
data=data, branch=branch, db=dbt, namespace_id=namespace_id
)
else:
reconciled_prefix = await cls._mutate_create_object_and_reconcile(
data=data, branch=branch, db=dbt, namespace_id=namespace_id
)

result = await cls.mutate_create_to_graphql(info=info, db=db, obj=reconciled_prefix)
result = await cls.mutate_create_to_graphql(info=info, db=dbt, obj=reconciled_prefix)

return reconciled_prefix, result

@classmethod
async def _mutate_update_object_and_reconcile(
cls,
info: GraphQLResolveInfo,
data: InputObjectType,
branch: Branch,
db: InfrahubDatabase,
prefix: Node,
namespace_id: str,
) -> Node:
prefix = await cls.mutate_update_object(db=db, info=info, data=data, branch=branch, obj=prefix)
return await cls._reconcile_prefix(
branch=branch, db=db, prefix=prefix, namespace_id=namespace_id, is_delete=False
)

@classmethod
@retry_db_transaction(name="ipprefix_update")
async def mutate_update(
Expand All @@ -252,13 +340,15 @@ async def mutate_update(
namespace_id = await validate_namespace(db=db, data=data, existing_namespace_id=namespace.id)
try:
async with db.start_transaction() as dbt:
prefix = await cls.mutate_update_object(db=dbt, info=info, data=data, branch=branch, obj=prefix)
reconciler = IpamReconciler(db=dbt, branch=branch)
ip_network = ipaddress.ip_network(prefix.prefix.value)
reconciled_prefix = await reconciler.reconcile(
ip_value=ip_network, node_uuid=prefix.get_id(), namespace=namespace_id
)

if lock_name := cls._get_lock_name(namespace_id, branch):
async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]):
reconciled_prefix = await cls._mutate_update_object_and_reconcile(
info=info, data=data, prefix=prefix, db=dbt, namespace_id=namespace_id, branch=branch
)
else:
reconciled_prefix = await cls._mutate_update_object_and_reconcile(
info=info, data=data, prefix=prefix, db=dbt, namespace_id=namespace_id, branch=branch
)
result = await cls.mutate_update_to_graphql(db=dbt, info=info, obj=reconciled_prefix)
except ValidationError as exc:
raise ValueError(str(exc)) from exc
Expand All @@ -284,6 +374,22 @@ async def mutate_upsert(

return prefix, result, created

@classmethod
async def _reconcile_prefix(
cls,
branch: Branch,
db: InfrahubDatabase,
prefix: Node,
namespace_id: str,
is_delete: bool,
) -> Node:
reconciler = IpamReconciler(db=db, branch=branch)
ip_network = ipaddress.ip_network(prefix.prefix.value)
reconciled_prefix = await reconciler.reconcile(
ip_value=ip_network, node_uuid=prefix.get_id(), namespace=namespace_id, is_delete=is_delete
)
return reconciled_prefix

@classmethod
@retry_db_transaction(name="ipprefix_delete")
async def mutate_delete(
Expand All @@ -303,11 +409,16 @@ async def mutate_delete(
namespace_id = namespace_rels[0].peer_id
try:
async with context.db.start_transaction() as dbt:
reconciler = IpamReconciler(db=dbt, branch=branch)
ip_network = ipaddress.ip_network(prefix.prefix.value)
reconciled_prefix = await reconciler.reconcile(
ip_value=ip_network, node_uuid=prefix.get_id(), namespace=namespace_id, is_delete=True
)
if lock_name := cls._get_lock_name(namespace_id, branch):
async with InfrahubMultiLock(lock_registry=lock.registry, locks=[lock_name]):
reconciled_prefix = await cls._reconcile_prefix(
branch=branch, db=dbt, prefix=prefix, namespace_id=namespace_id, is_delete=True
)
else:
reconciled_prefix = await cls._reconcile_prefix(
branch=branch, db=dbt, prefix=prefix, namespace_id=namespace_id, is_delete=True
)

except ValidationError as exc:
raise ValueError(str(exc)) from exc

Expand Down
Loading

0 comments on commit 938522d

Please sign in to comment.