Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert EventBranchRebased to InfrahubEvent #5088

Open
wants to merge 5 commits into
base: release-1.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 16 additions & 21 deletions backend/infrahub/core/branch/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from infrahub.core.validators.models.validate_migration import SchemaValidateMigrationData
from infrahub.core.validators.tasks import schema_validate_migrations
from infrahub.dependencies.registry import get_component_registry
from infrahub.events.branch_action import BranchCreateEvent, BranchDeleteEvent
from infrahub.events.branch_action import BranchCreateEvent, BranchDeleteEvent, BranchRebaseEvent
from infrahub.exceptions import BranchNotFoundError, MergeFailedError, ValidationError
from infrahub.graphql.mutations.models import BranchCreateModel # noqa: TCH001
from infrahub.log import get_log_data
Expand All @@ -31,6 +31,7 @@
from infrahub.worker import WORKER_IDENTITY
from infrahub.workflows.catalogue import (
BRANCH_CANCEL_PROPOSED_CHANGES,
DIFF_REFRESH_ALL,
GIT_REPOSITORIES_CREATE_BRANCH,
IPAM_RECONCILIATION,
)
Expand Down Expand Up @@ -109,19 +110,19 @@ async def rebase_branch(branch: str) -> None:
obj.update_schema_hash()
await obj.save(db=service.database)

# Execute the migrations
migrations = await merger.calculate_migrations(target_schema=updated_schema)
# Execute the migrations
migrations = await merger.calculate_migrations(target_schema=updated_schema)

errors = await schema_apply_migrations(
message=SchemaApplyMigrationData(
branch=merger.source_branch,
new_schema=candidate_schema,
previous_schema=schema_in_main_before,
migrations=migrations,
errors = await schema_apply_migrations(
message=SchemaApplyMigrationData(
branch=merger.source_branch,
new_schema=candidate_schema,
previous_schema=schema_in_main_before,
migrations=migrations,
)
)
)
for error in errors:
log.error(error)
for error in errors:
log.error(error)

# -------------------------------------------------------------
# Trigger the reconciliation of IPAM data after the rebase
Expand All @@ -136,18 +137,12 @@ async def rebase_branch(branch: str) -> None:
workflow=IPAM_RECONCILIATION, parameters={"branch": obj.name, "ipam_node_details": ipam_node_details}
)

await service.workflow.submit_workflow(workflow=DIFF_REFRESH_ALL, parameters={"branch_name": obj.name})

# -------------------------------------------------------------
# Generate an event to indicate that a branch has been rebased
# NOTE: we still need to convert this event and potentially pull
# some tasks currently executed based on the event into this workflow
# -------------------------------------------------------------
log_data = get_log_data()
request_id = log_data.get("request_id", "")
message = messages.EventBranchRebased(
branch=obj.name,
meta=Meta(initiator_id=WORKER_IDENTITY, request_id=request_id),
)
await service.send(message=message)
await service.event.send(event=BranchRebaseEvent(branch=obj.name, branch_id=obj.get_id()))


@flow(name="branch-merge", flow_run_name="Merge branch {branch} into main")
Expand Down
7 changes: 0 additions & 7 deletions backend/infrahub/core/diff/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,3 @@ class RequestDiffUpdate(BaseModel):
name: str | None = None
from_time: str | None = None
to_time: str | None = None


class RequestDiffRefresh(BaseModel):
"""Request diff be recalculated from scratch."""

branch_name: str = Field(..., description="The branch associated with the diff")
diff_id: str = Field(..., description="The id for this diff")
14 changes: 11 additions & 3 deletions backend/infrahub/core/diff/payload_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from infrahub.core.manager import NodeManager
from infrahub.core.registry import registry
from infrahub.exceptions import SchemaNotFoundError
from infrahub.log import get_logger

if TYPE_CHECKING:
Expand All @@ -14,12 +15,17 @@


async def get_display_labels_per_kind(
kind: str, ids: list[str], branch_name: str, db: InfrahubDatabase
kind: str, ids: list[str], branch_name: str, db: InfrahubDatabase, skip_missing_schema: bool = False
) -> dict[str, str]:
"""Return the display_labels of a list of nodes of a specific kind."""
branch = await registry.get_branch(branch=branch_name, db=db)
schema_branch = db.schema.get_schema_branch(name=branch.name)
fields = schema_branch.generate_fields_for_display_label(name=kind)
try:
fields = schema_branch.generate_fields_for_display_label(name=kind)
except SchemaNotFoundError:
if skip_missing_schema:
return {}
raise
nodes = await NodeManager.get_many(ids=ids, fields=fields, db=db, branch=branch)
return {node_id: await node.render_display_label(db=db) for node_id, node in nodes.items()}

Expand All @@ -31,7 +37,9 @@ async def get_display_labels(nodes: dict[str, dict[str, list[str]]], db: Infrahu
if branch_name not in response:
response[branch_name] = {}
for kind, ids in items.items():
labels = await get_display_labels_per_kind(kind=kind, ids=ids, db=db, branch_name=branch_name)
labels = await get_display_labels_per_kind(
kind=kind, ids=ids, db=db, branch_name=branch_name, skip_missing_schema=True
)
response[branch_name].update(labels)

return response
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes in this file correspond to a cherry pick of #5128

31 changes: 25 additions & 6 deletions backend/infrahub/core/diff/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

from infrahub.core import registry
from infrahub.core.diff.coordinator import DiffCoordinator
from infrahub.core.diff.models import RequestDiffRefresh, RequestDiffUpdate
from infrahub.core.diff.models import RequestDiffUpdate
from infrahub.core.diff.repository.repository import DiffRepository
from infrahub.dependencies.registry import get_component_registry
from infrahub.log import get_logger
from infrahub.services import services
from infrahub.workflows.catalogue import DIFF_REFRESH
from infrahub.workflows.utils import add_branch_tag

log = get_logger()
Expand All @@ -31,14 +33,31 @@ async def update_diff(model: RequestDiffUpdate) -> None:
)


@flow(name="diff-refresh", flow_run_name="Recreate diff for branch {model.branch_name}")
async def refresh_diff(model: RequestDiffRefresh) -> None:
@flow(name="diff-refresh", flow_run_name="Recreate diff for branch {branch_name}")
async def refresh_diff(branch_name: str, diff_id: str) -> None:
service = services.service
await add_branch_tag(branch_name=model.branch_name)
await add_branch_tag(branch_name=branch_name)

component_registry = get_component_registry()
base_branch = await registry.get_branch(db=service.database, branch=registry.default_branch)
diff_branch = await registry.get_branch(db=service.database, branch=model.branch_name)
diff_branch = await registry.get_branch(db=service.database, branch=branch_name)

diff_coordinator = await component_registry.get_component(DiffCoordinator, db=service.database, branch=diff_branch)
await diff_coordinator.recalculate(base_branch=base_branch, diff_branch=diff_branch, diff_id=model.diff_id)
await diff_coordinator.recalculate(base_branch=base_branch, diff_branch=diff_branch, diff_id=diff_id)


@flow(name="diff-refresh-all", flow_run_name="Recreate all diffs for branch {branch_name}")
async def refresh_diff_all(branch_name: str) -> None:
service = services.service
await add_branch_tag(branch_name=branch_name)

component_registry = get_component_registry()
default_branch = registry.get_branch_from_registry()
diff_repository = await component_registry.get_component(DiffRepository, db=service.database, branch=default_branch)
diff_roots_to_refresh = await diff_repository.get_empty_roots(diff_branch_names=[branch_name])

for diff_root in diff_roots_to_refresh:
if diff_root.base_branch_name != diff_root.diff_branch_name:
await service.workflow.submit_workflow(
workflow=DIFF_REFRESH, parameters={"branch_name": diff_root.diff_branch_name, "diff_id": diff_root.uuid}
)
26 changes: 26 additions & 0 deletions backend/infrahub/events/branch_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from infrahub.message_bus import InfrahubMessage
from infrahub.message_bus.messages.event_branch_delete import EventBranchDelete
from infrahub.message_bus.messages.refresh_registry_branches import RefreshRegistryBranches
from infrahub.message_bus.messages.refresh_registry_rebasedbranch import RefreshRegistryRebasedBranch

from .models import InfrahubBranchEvent

Expand Down Expand Up @@ -63,3 +64,28 @@ def get_messages(self) -> list[InfrahubMessage]:
RefreshRegistryBranches(),
]
return events # type: ignore


class BranchRebaseEvent(InfrahubBranchEvent):
"""Event generated when a branch has been rebased"""

branch_id: str = Field(..., description="The ID of the mutated node")

def get_name(self) -> str:
return f"{self.get_event_namespace()}.branch.rebased"

def get_resource(self) -> dict[str, str]:
return {
"prefect.resource.id": f"infrahub.branch.{self.branch}",
"infrahub.branch.id": self.branch_id,
}

def get_messages(self) -> list[InfrahubMessage]:
events = [
# EventBranchRebased(
# branch=self.branch,
# meta=self.get_message_meta(),
# ),
RefreshRegistryRebasedBranch(branch=self.branch),
]
return events # type: ignore
4 changes: 2 additions & 2 deletions backend/infrahub/graphql/mutations/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from infrahub.core.diff.coordinator import DiffCoordinator
from infrahub.core.diff.models import RequestDiffUpdate
from infrahub.dependencies.registry import get_component_registry
from infrahub.workflows.catalogue import REQUEST_DIFF_UPDATE
from infrahub.workflows.catalogue import DIFF_UPDATE

if TYPE_CHECKING:
from ..initialization import GraphqlContext
Expand Down Expand Up @@ -63,6 +63,6 @@ async def mutate(
to_time=to_timestamp_str,
)
if context.service:
await context.service.workflow.submit_workflow(workflow=REQUEST_DIFF_UPDATE, parameters={"model": model})
await context.service.workflow.submit_workflow(workflow=DIFF_UPDATE, parameters={"model": model})

return {"ok": True}
1 change: 0 additions & 1 deletion backend/infrahub/message_bus/operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
"check.repository.merge_conflicts": check.repository.merge_conflicts,
"check.repository.user_check": check.repository.user_check,
"event.branch.merge": event.branch.merge,
"event.branch.rebased": event.branch.rebased,
"event.node.mutated": event.node.mutated,
"event.schema.update": event.schema.update,
"event.worker.new_primary_api": event.worker.new_primary_api,
Expand Down
35 changes: 3 additions & 32 deletions backend/infrahub/message_bus/operations/event/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@

from infrahub.core import registry
from infrahub.core.diff.model.path import BranchTrackingId
from infrahub.core.diff.models import RequestDiffRefresh, RequestDiffUpdate
from infrahub.core.diff.models import RequestDiffUpdate
from infrahub.core.diff.repository.repository import DiffRepository
from infrahub.dependencies.registry import get_component_registry
from infrahub.log import get_logger
from infrahub.message_bus import InfrahubMessage, messages
from infrahub.services import InfrahubServices
from infrahub.workflows.catalogue import (
REQUEST_DIFF_REFRESH,
REQUEST_DIFF_UPDATE,
DIFF_UPDATE,
TRIGGER_ARTIFACT_DEFINITION_GENERATE,
TRIGGER_GENERATOR_DEFINITION_RUN,
)
Expand Down Expand Up @@ -51,35 +50,7 @@ async def merge(message: messages.EventBranchMerge, service: InfrahubServices) -
):
request_diff_update_model = RequestDiffUpdate(branch_name=diff_root.diff_branch_name)
await service.workflow.submit_workflow(
workflow=REQUEST_DIFF_UPDATE, parameters={"model": request_diff_update_model}
)

for event in events:
event.assign_meta(parent=message)
await service.send(message=event)


@flow(name="event-branch-rebased")
async def rebased(message: messages.EventBranchRebased, service: InfrahubServices) -> None:
log.info("Branch rebased", branch=message.branch)

events: List[InfrahubMessage] = [
messages.RefreshRegistryRebasedBranch(branch=message.branch),
]

# for every diff that touches the rebased branch, recalculate it
component_registry = get_component_registry()
default_branch = registry.get_branch_from_registry()
diff_repository = await component_registry.get_component(DiffRepository, db=service.database, branch=default_branch)
diff_roots_to_refresh = await diff_repository.get_empty_roots(diff_branch_names=[message.branch])

for diff_root in diff_roots_to_refresh:
if diff_root.base_branch_name != diff_root.diff_branch_name:
request_diff_refresh_model = RequestDiffRefresh(
branch_name=diff_root.diff_branch_name, diff_id=diff_root.uuid
)
await service.workflow.submit_workflow(
workflow=REQUEST_DIFF_REFRESH, parameters={"model": request_diff_refresh_model}
workflow=DIFF_UPDATE, parameters={"model": request_diff_update_model}
)

for event in events:
Expand Down
16 changes: 12 additions & 4 deletions backend/infrahub/workflows/catalogue.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,27 @@
tags=[WorkflowTag.DATABASE_CHANGE],
)

REQUEST_DIFF_UPDATE = WorkflowDefinition(
DIFF_UPDATE = WorkflowDefinition(
name="diff-update",
type=WorkflowType.CORE,
module="infrahub.core.diff.tasks",
function="update_diff",
)

REQUEST_DIFF_REFRESH = WorkflowDefinition(
DIFF_REFRESH = WorkflowDefinition(
name="diff-refresh",
type=WorkflowType.CORE,
module="infrahub.core.diff.tasks",
function="refresh_diff",
)

DIFF_REFRESH_ALL = WorkflowDefinition(
name="diff-refresh-all",
type=WorkflowType.INTERNAL,
module="infrahub.core.diff.tasks",
function="refresh_diff_all",
)

GIT_REPOSITORIES_SYNC = WorkflowDefinition(
name="git_repositories_sync",
type=WorkflowType.INTERNAL,
Expand Down Expand Up @@ -385,6 +392,9 @@
BRANCH_VALIDATE,
COMPUTED_ATTRIBUTE_SETUP,
COMPUTED_ATTRIBUTE_SETUP_PYTHON,
DIFF_REFRESH,
DIFF_REFRESH_ALL,
DIFF_UPDATE,
GIT_REPOSITORIES_CREATE_BRANCH,
GIT_REPOSITORIES_DIFF_NAMES_ONLY,
GIT_REPOSITORIES_IMPORT_OBJECTS,
Expand All @@ -399,8 +409,6 @@
QUERY_COMPUTED_ATTRIBUTE_TRANSFORM_TARGETS,
REQUEST_ARTIFACT_DEFINITION_GENERATE,
REQUEST_ARTIFACT_GENERATE,
REQUEST_DIFF_REFRESH,
REQUEST_DIFF_UPDATE,
REQUEST_GENERATOR_DEFINITION_RUN,
REQUEST_GENERATOR_RUN,
REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY,
Expand Down
12 changes: 5 additions & 7 deletions backend/tests/helpers/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@
async def load_schema(
db: InfrahubDatabase, schema: SchemaRoot, branch_name: str | None = None, update_db: bool = False
) -> None:
default_branch_name = registry.default_branch
branch_schema = registry.schema.get_schema_branch(name=branch_name or default_branch_name)
tmp_schema = branch_schema.duplicate()
tmp_schema.load_schema(schema=schema)
tmp_schema.process()

branch_name = branch_name or registry.default_branch
branch_schema = registry.schema.get_schema_branch(name=branch_name)
registry.schema.register_schema(schema=schema, branch=branch_name)
await registry.schema.update_schema_branch(
schema=tmp_schema, db=db, branch=branch_name or default_branch_name, update_db=update_db
schema=branch_schema.duplicate(), db=db, branch=branch_name, update_db=update_db
)
registry.get_branch_from_registry(branch_name).update_schema_hash()


__all__ = [
Expand Down
23 changes: 22 additions & 1 deletion backend/tests/integration/ipam/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,34 @@
from infrahub.core import registry
from infrahub.core.constants import InfrahubKind
from infrahub.core.node import Node
from infrahub.core.schema import SchemaRoot
from infrahub.core.schema.schema_branch import SchemaBranch
from tests.helpers.test_app import TestInfrahubApp

if TYPE_CHECKING:
from infrahub_sdk import InfrahubClient

from infrahub.core.schema import SchemaRoot
from infrahub.core.schema.schema_branch import SchemaBranch
from infrahub.database import InfrahubDatabase


class TestIpamReconcileBase(TestInfrahubApp):
class TestIpam(TestInfrahubApp):
@pytest.fixture(scope="class")
async def register_ipam_schema(
self, initialize_registry, ipam_schema: SchemaRoot, client: InfrahubClient
) -> SchemaBranch:
# During registry initialization, default_branch might have already been loaded in db, and thus a new python
# object corresponding to default branch would be created. Then, we can't rely on default_branch fixture here,
# as updating default_branch schema hash would not update the object within registry.
# This is why this fixture depends on initialize_registry and client (which calls initialize_registry).
default_branch_name = registry.default_branch
schema_branch = registry.schema.register_schema(schema=ipam_schema, branch=default_branch_name)
registry.get_branch_from_registry(default_branch_name).update_schema_hash()
return schema_branch


class TestIpamReconcileBase(TestIpam):
@pytest.fixture(scope="class")
async def initial_dataset(
self,
Expand Down
Loading
Loading