Skip to content

Commit

Permalink
Convert EventBranchRebased to InfrahubEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
LucasG0 committed Nov 29, 2024
1 parent 094447b commit 78fd701
Show file tree
Hide file tree
Showing 18 changed files with 114 additions and 173 deletions.
39 changes: 17 additions & 22 deletions backend/infrahub/core/branch/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from infrahub.core.validators.models.validate_migration import SchemaValidateMigrationData
from infrahub.core.validators.tasks import schema_validate_migrations
from infrahub.dependencies.registry import get_component_registry
from infrahub.events.branch_action import BranchCreateEvent, BranchDeleteEvent
from infrahub.events.branch_action import BranchCreateEvent, BranchDeleteEvent, BranchRebaseEvent
from infrahub.exceptions import BranchNotFoundError, MergeFailedError, ValidationError
from infrahub.graphql.mutations.models import BranchCreateModel # noqa: TCH001
from infrahub.log import get_log_data
Expand All @@ -31,6 +31,7 @@
from infrahub.worker import WORKER_IDENTITY
from infrahub.workflows.catalogue import (
BRANCH_CANCEL_PROPOSED_CHANGES,
DIFF_REFRESH_ALL,
GIT_REPOSITORIES_CREATE_BRANCH,
IPAM_RECONCILIATION,
)
Expand Down Expand Up @@ -95,7 +96,7 @@ async def rebase_branch(branch: str) -> None:
# NOTE there is a bit additional work in order to calculate a proper diff that will
# allow us to pull only the part of the schema that has changed, for now the safest option is to pull
# Everything
# schema_diff = await merger.has_schema_changes()
# schema_diff = await merger.has_schema_changes()a
# TODO Would be good to convert this part to a Prefect Task in order to track it properly
updated_schema = await registry.schema.load_schema_from_db(
db=service.database,
Expand All @@ -107,19 +108,19 @@ async def rebase_branch(branch: str) -> None:
obj.update_schema_hash()
await obj.save(db=service.database)

# Execute the migrations
migrations = await merger.calculate_migrations(target_schema=updated_schema)
# Execute the migrations
migrations = await merger.calculate_migrations(target_schema=updated_schema)

errors = await schema_apply_migrations(
message=SchemaApplyMigrationData(
branch=merger.source_branch,
new_schema=candidate_schema,
previous_schema=schema_in_main_before,
migrations=migrations,
errors = await schema_apply_migrations(
message=SchemaApplyMigrationData(
branch=merger.source_branch,
new_schema=candidate_schema,
previous_schema=schema_in_main_before,
migrations=migrations,
)
)
)
for error in errors:
log.error(error)
for error in errors:
log.error(error)

# -------------------------------------------------------------
# Trigger the reconciliation of IPAM data after the rebase
Expand All @@ -134,18 +135,12 @@ async def rebase_branch(branch: str) -> None:
workflow=IPAM_RECONCILIATION, parameters={"branch": obj.name, "ipam_node_details": ipam_node_details}
)

await service.workflow.submit_workflow(workflow=DIFF_REFRESH_ALL, parameters={"branch_name": obj.name})

# -------------------------------------------------------------
# Generate an event to indicate that a branch has been rebased
# NOTE: we still need to convert this event and potentially pull
# some tasks currently executed based on the event into this workflow
# -------------------------------------------------------------
log_data = get_log_data()
request_id = log_data.get("request_id", "")
message = messages.EventBranchRebased(
branch=obj.name,
meta=Meta(initiator_id=WORKER_IDENTITY, request_id=request_id),
)
await service.send(message=message)
await service.event.send(event=BranchRebaseEvent(branch=obj.name, branch_id=obj.get_id()))


@flow(name="branch-merge", flow_run_name="Merge branch {branch} into main")
Expand Down
7 changes: 0 additions & 7 deletions backend/infrahub/core/diff/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,3 @@ class RequestDiffUpdate(BaseModel):
name: str | None = None
from_time: str | None = None
to_time: str | None = None


class RequestDiffRefresh(BaseModel):
"""Request diff be recalculated from scratch."""

branch_name: str = Field(..., description="The branch associated with the diff")
diff_id: str = Field(..., description="The id for this diff")
31 changes: 25 additions & 6 deletions backend/infrahub/core/diff/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

from infrahub.core import registry
from infrahub.core.diff.coordinator import DiffCoordinator
from infrahub.core.diff.models import RequestDiffRefresh, RequestDiffUpdate
from infrahub.core.diff.models import RequestDiffUpdate
from infrahub.core.diff.repository.repository import DiffRepository
from infrahub.dependencies.registry import get_component_registry
from infrahub.log import get_logger
from infrahub.services import services
from infrahub.workflows.catalogue import DIFF_REFRESH
from infrahub.workflows.utils import add_branch_tag

log = get_logger()
Expand All @@ -31,14 +33,31 @@ async def update_diff(model: RequestDiffUpdate) -> None:
)


@flow(name="diff-refresh", flow_run_name="Recreate diff for branch {model.branch_name}")
async def refresh_diff(model: RequestDiffRefresh) -> None:
@flow(name="diff-refresh", flow_run_name="Recreate diff for branch {branch_name}")
async def refresh_diff(branch_name: str, diff_id: str) -> None:
service = services.service
await add_branch_tag(branch_name=model.branch_name)
await add_branch_tag(branch_name=branch_name)

component_registry = get_component_registry()
base_branch = await registry.get_branch(db=service.database, branch=registry.default_branch)
diff_branch = await registry.get_branch(db=service.database, branch=model.branch_name)
diff_branch = await registry.get_branch(db=service.database, branch=branch_name)

diff_coordinator = await component_registry.get_component(DiffCoordinator, db=service.database, branch=diff_branch)
await diff_coordinator.recalculate(base_branch=base_branch, diff_branch=diff_branch, diff_id=model.diff_id)
await diff_coordinator.recalculate(base_branch=base_branch, diff_branch=diff_branch, diff_id=diff_id)


@flow(name="diff-refresh-all", flow_run_name="Recreate all diffs for branch {branch_name}")
async def refresh_diff_all(branch_name: str) -> None:
service = services.service
await add_branch_tag(branch_name=branch_name)

component_registry = get_component_registry()
default_branch = registry.get_branch_from_registry()
diff_repository = await component_registry.get_component(DiffRepository, db=service.database, branch=default_branch)
diff_roots_to_refresh = await diff_repository.get_empty_roots(diff_branch_names=[branch_name])

for diff_root in diff_roots_to_refresh:
if diff_root.base_branch_name != diff_root.diff_branch_name:
await service.workflow.submit_workflow(
workflow=DIFF_REFRESH, parameters={"branch_name": diff_root.diff_branch_name, "diff_id": diff_root.uuid}
)
26 changes: 26 additions & 0 deletions backend/infrahub/events/branch_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from infrahub.message_bus import InfrahubMessage
from infrahub.message_bus.messages.event_branch_delete import EventBranchDelete
from infrahub.message_bus.messages.refresh_registry_branches import RefreshRegistryBranches
from infrahub.message_bus.messages.refresh_registry_rebasedbranch import RefreshRegistryRebasedBranch

from .models import InfrahubBranchEvent

Expand Down Expand Up @@ -63,3 +64,28 @@ def get_messages(self) -> list[InfrahubMessage]:
RefreshRegistryBranches(),
]
return events # type: ignore


class BranchRebaseEvent(InfrahubBranchEvent):
"""Event generated when a branch has been rebased"""

branch_id: str = Field(..., description="The ID of the mutated node")

def get_name(self) -> str:
return f"{self.get_event_namespace()}.branch.rebased"

def get_resource(self) -> dict[str, str]:
return {
"prefect.resource.id": f"infrahub.branch.{self.branch}",
"infrahub.branch.id": self.branch_id,
}

def get_messages(self) -> list[InfrahubMessage]:
events = [
# EventBranchRebased(
# branch=self.branch,
# meta=self.get_message_meta(),
# ),
RefreshRegistryRebasedBranch(branch=self.branch),
]
return events # type: ignore
2 changes: 1 addition & 1 deletion backend/infrahub/graphql/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ async def _handle_http_request(
if request.app.state.response_delay:
self.logger.info(f"Adding response delay of {request.app.state.response_delay} seconds")
# This is on purpose
time.sleep(request.app.state.response_delay) # noqa: ASYNC251
time.sleep(request.app.state.response_delay)

try:
operations = await _get_operation_from_request(request)
Expand Down
4 changes: 2 additions & 2 deletions backend/infrahub/graphql/mutations/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from infrahub.core.diff.coordinator import DiffCoordinator
from infrahub.core.diff.models import RequestDiffUpdate
from infrahub.dependencies.registry import get_component_registry
from infrahub.workflows.catalogue import REQUEST_DIFF_UPDATE
from infrahub.workflows.catalogue import DIFF_UPDATE

if TYPE_CHECKING:
from ..initialization import GraphqlContext
Expand Down Expand Up @@ -63,6 +63,6 @@ async def mutate(
to_time=to_timestamp_str,
)
if context.service:
await context.service.workflow.submit_workflow(workflow=REQUEST_DIFF_UPDATE, parameters={"model": model})
await context.service.workflow.submit_workflow(workflow=DIFF_UPDATE, parameters={"model": model})

return {"ok": True}
3 changes: 2 additions & 1 deletion backend/infrahub/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import structlog
from pydantic import TypeAdapter
from structlog.dev import plain_traceback

if TYPE_CHECKING:
from structlog.types import Processor
Expand Down Expand Up @@ -57,7 +58,7 @@ def configure_logging(production: bool, log_level: str) -> None:
if production:
log_renderer = structlog.processors.JSONRenderer()
else:
log_renderer = structlog.dev.ConsoleRenderer()
log_renderer = structlog.dev.ConsoleRenderer(exception_formatter=plain_traceback)

formatter = structlog.stdlib.ProcessorFormatter(
foreign_pre_chain=shared_processors,
Expand Down
1 change: 0 additions & 1 deletion backend/infrahub/message_bus/operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
"check.repository.merge_conflicts": check.repository.merge_conflicts,
"check.repository.user_check": check.repository.user_check,
"event.branch.merge": event.branch.merge,
"event.branch.rebased": event.branch.rebased,
"event.node.mutated": event.node.mutated,
"event.schema.update": event.schema.update,
"event.worker.new_primary_api": event.worker.new_primary_api,
Expand Down
35 changes: 3 additions & 32 deletions backend/infrahub/message_bus/operations/event/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@

from infrahub.core import registry
from infrahub.core.diff.model.path import BranchTrackingId
from infrahub.core.diff.models import RequestDiffRefresh, RequestDiffUpdate
from infrahub.core.diff.models import RequestDiffUpdate
from infrahub.core.diff.repository.repository import DiffRepository
from infrahub.dependencies.registry import get_component_registry
from infrahub.log import get_logger
from infrahub.message_bus import InfrahubMessage, messages
from infrahub.services import InfrahubServices
from infrahub.workflows.catalogue import (
REQUEST_DIFF_REFRESH,
REQUEST_DIFF_UPDATE,
DIFF_UPDATE,
TRIGGER_ARTIFACT_DEFINITION_GENERATE,
TRIGGER_GENERATOR_DEFINITION_RUN,
)
Expand Down Expand Up @@ -51,35 +50,7 @@ async def merge(message: messages.EventBranchMerge, service: InfrahubServices) -
):
request_diff_update_model = RequestDiffUpdate(branch_name=diff_root.diff_branch_name)
await service.workflow.submit_workflow(
workflow=REQUEST_DIFF_UPDATE, parameters={"model": request_diff_update_model}
)

for event in events:
event.assign_meta(parent=message)
await service.send(message=event)


@flow(name="event-branch-rebased")
async def rebased(message: messages.EventBranchRebased, service: InfrahubServices) -> None:
log.info("Branch rebased", branch=message.branch)

events: List[InfrahubMessage] = [
messages.RefreshRegistryRebasedBranch(branch=message.branch),
]

# for every diff that touches the rebased branch, recalculate it
component_registry = get_component_registry()
default_branch = registry.get_branch_from_registry()
diff_repository = await component_registry.get_component(DiffRepository, db=service.database, branch=default_branch)
diff_roots_to_refresh = await diff_repository.get_empty_roots(diff_branch_names=[message.branch])

for diff_root in diff_roots_to_refresh:
if diff_root.base_branch_name != diff_root.diff_branch_name:
request_diff_refresh_model = RequestDiffRefresh(
branch_name=diff_root.diff_branch_name, diff_id=diff_root.uuid
)
await service.workflow.submit_workflow(
workflow=REQUEST_DIFF_REFRESH, parameters={"model": request_diff_refresh_model}
workflow=DIFF_UPDATE, parameters={"model": request_diff_update_model}
)

for event in events:
Expand Down
13 changes: 9 additions & 4 deletions backend/infrahub/workflows/catalogue.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,27 @@
tags=[WorkflowTag.DATABASE_CHANGE],
)

REQUEST_DIFF_UPDATE = WorkflowDefinition(
DIFF_UPDATE = WorkflowDefinition(
name="diff-update",
type=WorkflowType.CORE,
module="infrahub.core.diff.tasks",
function="update_diff",
)

REQUEST_DIFF_REFRESH = WorkflowDefinition(
DIFF_REFRESH = WorkflowDefinition(
name="diff-refresh",
type=WorkflowType.CORE,
module="infrahub.core.diff.tasks",
function="refresh_diff",
)

DIFF_REFRESH_ALL = WorkflowDefinition(
name="diff-refresh-all",
type=WorkflowType.INTERNAL,
module="infrahub.core.diff.tasks",
function="refresh_diff_all",
)

GIT_REPOSITORIES_SYNC = WorkflowDefinition(
name="git_repositories_sync",
type=WorkflowType.INTERNAL,
Expand Down Expand Up @@ -399,8 +406,6 @@
QUERY_COMPUTED_ATTRIBUTE_TRANSFORM_TARGETS,
REQUEST_ARTIFACT_DEFINITION_GENERATE,
REQUEST_ARTIFACT_GENERATE,
REQUEST_DIFF_REFRESH,
REQUEST_DIFF_UPDATE,
REQUEST_GENERATOR_DEFINITION_RUN,
REQUEST_GENERATOR_RUN,
REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY,
Expand Down
5 changes: 5 additions & 0 deletions backend/tests/helpers/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,24 @@ async def workflow_local(self) -> AsyncGenerator[WorkflowLocalExecution, None]:

@pytest.fixture(scope="class")
async def register_internal_schema(self, db: InfrahubDatabase, default_branch: Branch) -> SchemaBranch:
print(f"start of register_internal_schema: {id(default_branch)=} and {id(registry.branch["main"])=}")
schema = SchemaRoot(**internal_schema)
schema_branch = registry.schema.register_schema(schema=schema, branch=default_branch.name)
default_branch.update_schema_hash()
await default_branch.save(db=db)
print(f"end of register_internal_schema: {id(default_branch)=} and {id(registry.branch["main"])=}")
return schema_branch

@pytest.fixture(scope="class")
async def register_core_schema(
self, db: InfrahubDatabase, default_branch: Branch, register_internal_schema: SchemaBranch
) -> SchemaBranch:
print(f"start of register_core_schema: {id(default_branch)=} and {id(registry.branch["main"])=}")
schema = SchemaRoot(**core_models)
schema_branch = registry.schema.register_schema(schema=schema, branch=default_branch.name)
default_branch.update_schema_hash()
await default_branch.save(db=db)
print(f"end of register_core_schema: {id(default_branch)=} and {id(registry.branch["main"])=}")
return schema_branch

@pytest.fixture(scope="class")
Expand Down Expand Up @@ -135,6 +139,7 @@ async def client(
async def initialize_registry(
self, db: InfrahubDatabase, register_core_schema: SchemaBranch, bus_simulator: BusSimulator, api_token: str
) -> None:
print(f"start of initialize_registry:{id(registry.branch["main"])=}")
admin_account = await create_account(
db=db, name="admin", password=config.SETTINGS.initial.admin_password, token_value=api_token
)
Expand Down
18 changes: 18 additions & 0 deletions backend/tests/integration/ipam/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,31 @@
from infrahub.core import registry
from infrahub.core.constants import InfrahubKind
from infrahub.core.node import Node
from infrahub.core.schema import SchemaRoot
from infrahub.core.schema.schema_branch import SchemaBranch
from tests.helpers.test_app import TestInfrahubApp

if TYPE_CHECKING:
from infrahub_sdk import InfrahubClient

from infrahub.core.schema import SchemaRoot
from infrahub.core.schema.schema_branch import SchemaBranch
from infrahub.database import InfrahubDatabase


class TestIpamReconcileBase(TestInfrahubApp):
@pytest.fixture(scope="class")
async def register_ipam_schema(
self, initialize_registry, ipam_schema: SchemaRoot, client: InfrahubClient
) -> SchemaBranch:
# We can't rely on default_branch here as default_branch and registry.get_branch_from_registry(registry.default_branch)
# actually are two distinct python objects. So updating default_branch would be useless as we later rely on the registry object.
# This is why this fixture depends on initialize_registry and client (finally calling initialize_registry too).
default_branch_name = registry.default_branch
schema_branch = registry.schema.register_schema(schema=ipam_schema, branch=default_branch_name)
registry.get_branch_from_registry(default_branch_name).update_schema_hash()
return schema_branch

@pytest.fixture(scope="class")
async def initial_dataset(
self,
Expand Down
Loading

0 comments on commit 78fd701

Please sign in to comment.