From 54cc4d44e5a45ee03d7813ce5ee9588826be7b21 Mon Sep 17 00:00:00 2001 From: Damien Garros Date: Sun, 10 Nov 2024 21:40:11 +0100 Subject: [PATCH 1/4] Finalize the migration of schema validation and schema migration tasks --- .../infrahub/core/migrations/schema/models.py | 10 +- .../infrahub/core/migrations/schema/runner.py | 67 ------- .../infrahub/core/migrations/schema/tasks.py | 72 ++++++-- .../validators/models/validate_migration.py | 11 +- backend/infrahub/core/validators/tasks.py | 51 +++++- .../infrahub/message_bus/messages/__init__.py | 6 - .../messages/schema_migration_path.py | 38 ---- .../messages/schema_validator_path.py | 33 ---- .../message_bus/operations/__init__.py | 3 - .../message_bus/operations/schema/__init__.py | 3 - .../operations/schema/migration.py | 117 ------------ .../operations/schema/validator.py | 88 --------- backend/tests/unit/api/test_40_schema_api.py | 167 ++++++++++++++++++ .../test_attribute_regex_update.py | 29 --- .../schema/test_node_attribute_add.py | 32 ---- .../migrations/schema/test_node_remove.py | 35 ---- .../core/migrations/schema/test_runner.py | 49 ----- docs/docs/reference/message-bus-events.mdx | 86 --------- 18 files changed, 288 insertions(+), 609 deletions(-) delete mode 100644 backend/infrahub/core/migrations/schema/runner.py delete mode 100644 backend/infrahub/message_bus/messages/schema_migration_path.py delete mode 100644 backend/infrahub/message_bus/messages/schema_validator_path.py delete mode 100644 backend/infrahub/message_bus/operations/schema/__init__.py delete mode 100644 backend/infrahub/message_bus/operations/schema/migration.py delete mode 100644 backend/infrahub/message_bus/operations/schema/validator.py delete mode 100644 backend/tests/unit/core/migrations/schema/test_runner.py diff --git a/backend/infrahub/core/migrations/schema/models.py b/backend/infrahub/core/migrations/schema/models.py index 16e0a7fad5..321e6c1779 100644 --- a/backend/infrahub/core/migrations/schema/models.py +++ b/backend/infrahub/core/migrations/schema/models.py @@ -1,7 +1,8 @@ -from pydantic import BaseModel, ConfigDict +from pydantic import BaseModel, ConfigDict, Field from infrahub.core.branch import Branch from infrahub.core.models import SchemaUpdateMigrationInfo +from infrahub.core.path import SchemaPath from infrahub.core.schema.schema_branch import SchemaBranch @@ -13,3 +14,10 @@ class SchemaApplyMigrationData(BaseModel): new_schema: SchemaBranch previous_schema: SchemaBranch migrations: list[SchemaUpdateMigrationInfo] + + +class SchemaMigrationPathResponseData(BaseModel): + errors: list[str] = Field(default_factory=list) + migration_name: str | None = None + nbr_migrations_executed: int | None = None + schema_path: SchemaPath | None = None diff --git a/backend/infrahub/core/migrations/schema/runner.py b/backend/infrahub/core/migrations/schema/runner.py deleted file mode 100644 index 8ebd66fb19..0000000000 --- a/backend/infrahub/core/migrations/schema/runner.py +++ /dev/null @@ -1,67 +0,0 @@ -from __future__ import annotations - -import asyncio -from typing import TYPE_CHECKING, Optional - -from infrahub.message_bus.messages.schema_migration_path import ( - SchemaMigrationPath, - SchemaMigrationPathResponse, -) - -if TYPE_CHECKING: - from infrahub.core.branch import Branch - from infrahub.core.models import SchemaUpdateMigrationInfo - from infrahub.core.schema import MainSchemaTypes - from infrahub.core.schema.schema_branch import SchemaBranch - from infrahub.services import InfrahubServices - - -async def schema_migrations_runner( - branch: Branch, - new_schema: SchemaBranch, - previous_schema: SchemaBranch, - migrations: list[SchemaUpdateMigrationInfo], - service: InfrahubServices, -) -> list[str]: - tasks = [] - error_messages: list[str] = [] - - if not migrations: - return error_messages - - for migration in migrations: - service.log.info( - f"Preparing migration for {migration.migration_name!r} ({migration.routing_key})", branch=branch.name - ) - - new_node_schema: Optional[MainSchemaTypes] = None - previous_node_schema: Optional[MainSchemaTypes] = None - - if new_schema.has(name=migration.path.schema_kind): - new_node_schema = new_schema.get(name=migration.path.schema_kind) - - if new_node_schema and new_node_schema.id: - previous_node_schema = previous_schema.get_by_id(id=new_node_schema.id) - else: - previous_node_schema = previous_schema.get(name=migration.path.schema_kind) - - if not previous_node_schema: - raise ValueError( - f"Unable to find the previous version of the schema for {migration.path.schema_kind}, in order to run the migration." - ) - - message = SchemaMigrationPath( - branch=branch, - migration_name=migration.migration_name, - new_node_schema=new_node_schema, - previous_node_schema=previous_node_schema, - schema_path=migration.path, - ) - tasks.append(service.message_bus.rpc(message=message, response_class=SchemaMigrationPathResponse)) - - responses: list[SchemaMigrationPathResponse] = await asyncio.gather(*tasks) # type: ignore[assignment] - - for response in responses: - error_messages.extend(response.data.errors) - - return error_messages diff --git a/backend/infrahub/core/migrations/schema/tasks.py b/backend/infrahub/core/migrations/schema/tasks.py index 60b18312c4..65c331c9b9 100644 --- a/backend/infrahub/core/migrations/schema/tasks.py +++ b/backend/infrahub/core/migrations/schema/tasks.py @@ -3,16 +3,16 @@ from typing import TYPE_CHECKING, Optional from infrahub_sdk.batch import InfrahubBatch -from prefect import flow +from prefect import flow, task +from prefect.logging import get_run_logger -from infrahub.message_bus.messages.schema_migration_path import ( - SchemaMigrationPathData, -) -from infrahub.message_bus.operations.schema.migration import schema_path_migrate +from infrahub.core.branch import Branch # noqa: TCH001 +from infrahub.core.migrations import MIGRATION_MAP +from infrahub.core.path import SchemaPath # noqa: TCH001 from infrahub.services import services from infrahub.workflows.utils import add_branch_tag -from .models import SchemaApplyMigrationData # noqa: TCH001 +from .models import SchemaApplyMigrationData, SchemaMigrationPathResponseData if TYPE_CHECKING: from infrahub.core.schema import MainSchemaTypes @@ -20,8 +20,8 @@ @flow(name="schema_apply_migrations", flow_run_name="Apply schema migrations") async def schema_apply_migrations(message: SchemaApplyMigrationData) -> list[str]: - service = services.service await add_branch_tag(branch_name=message.branch.name) + log = get_run_logger() batch = InfrahubBatch() error_messages: list[str] = [] @@ -30,10 +30,7 @@ async def schema_apply_migrations(message: SchemaApplyMigrationData) -> list[str return error_messages for migration in message.migrations: - service.log.info( - f"Preparing migration for {migration.migration_name!r} ({migration.routing_key})", - branch=message.branch.name, - ) + log.info(f"Preparing migration for {migration.migration_name!r} ({migration.routing_key})") new_node_schema: Optional[MainSchemaTypes] = None previous_node_schema: Optional[MainSchemaTypes] = None @@ -51,7 +48,8 @@ async def schema_apply_migrations(message: SchemaApplyMigrationData) -> list[str f"Unable to find the previous version of the schema for {migration.path.schema_kind}, in order to run the migration." ) - msg = SchemaMigrationPathData( + batch.add( + task=schema_path_migrate, branch=message.branch, migration_name=migration.migration_name, new_node_schema=new_node_schema, @@ -59,9 +57,55 @@ async def schema_apply_migrations(message: SchemaApplyMigrationData) -> list[str schema_path=migration.path, ) - batch.add(task=schema_path_migrate, message=msg) - async for _, result in batch.execute(): error_messages.extend(result.errors) return error_messages + + +@task( + name="schema-path-migrate", + task_run_name="Migrate Schema Path {migration_name} on {branch.name}", + description="Apply a given migration to the database", + retries=3, +) +async def schema_path_migrate( + branch: Branch, + migration_name: str, + schema_path: SchemaPath, + new_node_schema: MainSchemaTypes | None = None, + previous_node_schema: MainSchemaTypes | None = None, +) -> SchemaMigrationPathResponseData: + service = services.service + log = get_run_logger() + + async with service.database.start_session() as db: + node_kind = None + if new_node_schema: + node_kind = new_node_schema.kind + elif previous_node_schema: + node_kind = previous_node_schema.kind + + log.info( + f"Migration for {node_kind} starting {schema_path.get_path()}", + ) + migration_class = MIGRATION_MAP.get(migration_name) + if not migration_class: + raise ValueError(f"Unable to find the migration class for {migration_name}") + + migration = migration_class( # type: ignore[call-arg] + new_node_schema=new_node_schema, # type: ignore[arg-type] + previous_node_schema=previous_node_schema, # type: ignore[arg-type] + schema_path=schema_path, + ) + execution_result = await migration.execute(db=db, branch=branch) + + log.info(f"Migration completed for {migration_name}") + log.debug(f"execution_result {execution_result}") + + return SchemaMigrationPathResponseData( + migration_name=migration_name, + schema_path=schema_path, + errors=execution_result.errors, + nbr_migrations_executed=execution_result.nbr_migrations_executed, + ) diff --git a/backend/infrahub/core/validators/models/validate_migration.py b/backend/infrahub/core/validators/models/validate_migration.py index b6d61cc0a8..534d645cd2 100644 --- a/backend/infrahub/core/validators/models/validate_migration.py +++ b/backend/infrahub/core/validators/models/validate_migration.py @@ -1,8 +1,11 @@ -from pydantic import BaseModel, ConfigDict +from pydantic import BaseModel, ConfigDict, Field from infrahub.core.branch import Branch from infrahub.core.models import SchemaUpdateConstraintInfo +from infrahub.core.path import SchemaPath from infrahub.core.schema.schema_branch import SchemaBranch +from infrahub.core.validators.model import SchemaViolation +from infrahub.message_bus import InfrahubResponseData class SchemaValidateMigrationData(BaseModel): @@ -12,3 +15,9 @@ class SchemaValidateMigrationData(BaseModel): branch: Branch schema_branch: SchemaBranch constraints: list[SchemaUpdateConstraintInfo] + + +class SchemaValidatorPathResponseData(InfrahubResponseData): + violations: list[SchemaViolation] = Field(default_factory=list) + constraint_name: str + schema_path: SchemaPath diff --git a/backend/infrahub/core/validators/tasks.py b/backend/infrahub/core/validators/tasks.py index c3abfa1f96..da016faaee 100644 --- a/backend/infrahub/core/validators/tasks.py +++ b/backend/infrahub/core/validators/tasks.py @@ -1,16 +1,20 @@ from __future__ import annotations from infrahub_sdk.batch import InfrahubBatch -from prefect import flow +from prefect import flow, task -from infrahub.message_bus.messages.schema_validator_path import ( - SchemaValidatorPathData, +from infrahub.core.branch import Branch # noqa: TCH001 +from infrahub.core.path import SchemaPath # noqa: TCH001 +from infrahub.core.schema import GenericSchema, NodeSchema # noqa: TCH001 +from infrahub.core.validators.aggregated_checker import AggregatedConstraintChecker +from infrahub.core.validators.model import ( + SchemaConstraintValidatorRequest, ) -from infrahub.message_bus.operations.schema.validator import schema_path_validate +from infrahub.dependencies.registry import get_component_registry from infrahub.services import services from infrahub.workflows.utils import add_branch_tag -from .models.validate_migration import SchemaValidateMigrationData # noqa: TCH001 +from .models.validate_migration import SchemaValidateMigrationData, SchemaValidatorPathResponseData @flow(name="schema_validate_migrations", flow_run_name="Validate schema migrations") @@ -31,16 +35,49 @@ async def schema_validate_migrations(message: SchemaValidateMigrationData) -> li routing_key=constraint.routing_key, ) - msg = SchemaValidatorPathData( + batch.add( + task=schema_path_validate, branch=message.branch, constraint_name=constraint.constraint_name, node_schema=message.schema_branch.get(name=constraint.path.schema_kind), schema_path=constraint.path, ) - batch.add(task=schema_path_validate, message=msg) async for _, result in batch.execute(): for violation in result.violations: error_messages.append(violation.message) return error_messages + + +@task( + name="schema-path-validate", + task_run_name="Validate schema path {constraint_name} in {branch.name}", + description="Validate if a given migration is compatible with the existing data", + retries=3, +) +async def schema_path_validate( + branch: Branch, + constraint_name: str, + node_schema: NodeSchema | GenericSchema, + schema_path: SchemaPath, +) -> SchemaValidatorPathResponseData: + service = services.service + + async with service.database.start_session() as db: + constraint_request = SchemaConstraintValidatorRequest( + branch=branch, + constraint_name=constraint_name, + node_schema=node_schema, + schema_path=schema_path, + ) + + component_registry = get_component_registry() + aggregated_constraint_checker = await component_registry.get_component( + AggregatedConstraintChecker, db=db, branch=branch + ) + violations = await aggregated_constraint_checker.run_constraints(constraint_request) + + return SchemaValidatorPathResponseData( + violations=violations, constraint_name=constraint_name, schema_path=schema_path + ) diff --git a/backend/infrahub/message_bus/messages/__init__.py b/backend/infrahub/message_bus/messages/__init__.py index de34932e1a..47e6a351d3 100644 --- a/backend/infrahub/message_bus/messages/__init__.py +++ b/backend/infrahub/message_bus/messages/__init__.py @@ -24,8 +24,6 @@ from .request_proposedchange_pipeline import RequestProposedChangePipeline from .request_repository_checks import RequestRepositoryChecks from .request_repository_userchecks import RequestRepositoryUserChecks -from .schema_migration_path import SchemaMigrationPath, SchemaMigrationPathResponse -from .schema_validator_path import SchemaValidatorPath, SchemaValidatorPathResponse from .send_echo_request import SendEchoRequest, SendEchoRequestResponse MESSAGE_MAP: dict[str, type[InfrahubMessage]] = { @@ -44,8 +42,6 @@ "finalize.validator.execution": FinalizeValidatorExecution, "git.file.get": GitFileGet, "git.repository.connectivity": GitRepositoryConnectivity, - "schema.migration.path": SchemaMigrationPath, - "schema.validator.path": SchemaValidatorPath, "refresh.git.fetch": RefreshGitFetch, "refresh.registry.branches": RefreshRegistryBranches, "refresh.registry.rebased_branch": RefreshRegistryRebasedBranch, @@ -61,8 +57,6 @@ RESPONSE_MAP: dict[str, type[InfrahubResponse]] = { "git.file.get": GitFileGetResponse, "send.echo.request": SendEchoRequestResponse, - "schema.migration.path": SchemaMigrationPathResponse, - "schema.validator.path": SchemaValidatorPathResponse, } PRIORITY_MAP = { diff --git a/backend/infrahub/message_bus/messages/schema_migration_path.py b/backend/infrahub/message_bus/messages/schema_migration_path.py deleted file mode 100644 index c477b0346b..0000000000 --- a/backend/infrahub/message_bus/messages/schema_migration_path.py +++ /dev/null @@ -1,38 +0,0 @@ -from __future__ import annotations - -from typing import Optional - -from pydantic import BaseModel, Field - -from infrahub.core.branch import Branch # noqa: TCH001 -from infrahub.core.path import SchemaPath # noqa: TCH001 -from infrahub.core.schema import MainSchemaTypes # noqa: TCH001 -from infrahub.message_bus import InfrahubMessage, InfrahubResponse, InfrahubResponseData - -ROUTING_KEY = "schema.migration.path" - - -class SchemaMigrationPathData(BaseModel): - branch: Branch = Field(..., description="The name of the branch to target") - migration_name: str = Field(..., description="The name of the migration to run") - new_node_schema: Optional[MainSchemaTypes] = Field(None, description="new Schema of Node or Generic to process") - previous_node_schema: Optional[MainSchemaTypes] = Field( - None, description="Previous Schema of Node or Generic to process" - ) - schema_path: SchemaPath = Field(..., description="SchemaPath to the element of the schema to migrate") - - -class SchemaMigrationPath(SchemaMigrationPathData, InfrahubMessage): - pass - - -class SchemaMigrationPathResponseData(InfrahubResponseData): - errors: list[str] = Field(default_factory=list) - migration_name: Optional[str] = None - nbr_migrations_executed: Optional[int] = None - schema_path: Optional[SchemaPath] = None - - -class SchemaMigrationPathResponse(InfrahubResponse): - routing_key: str = ROUTING_KEY - data: SchemaMigrationPathResponseData diff --git a/backend/infrahub/message_bus/messages/schema_validator_path.py b/backend/infrahub/message_bus/messages/schema_validator_path.py deleted file mode 100644 index 07215e601d..0000000000 --- a/backend/infrahub/message_bus/messages/schema_validator_path.py +++ /dev/null @@ -1,33 +0,0 @@ -from __future__ import annotations - -from pydantic import BaseModel, Field - -from infrahub.core.branch import Branch # noqa: TCH001 -from infrahub.core.path import SchemaPath # noqa: TCH001 -from infrahub.core.schema import MainSchemaTypes # noqa: TCH001 -from infrahub.core.validators.model import SchemaViolation # noqa: TCH001 -from infrahub.message_bus import InfrahubMessage, InfrahubResponse, InfrahubResponseData - -ROUTING_KEY = "schema.validator.path" - - -class SchemaValidatorPathData(BaseModel): - branch: Branch = Field(..., description="The name of the branch to target") - constraint_name: str = Field(..., description="The name of the constraint to validate") - node_schema: MainSchemaTypes = Field(..., description="Schema of Node or Generic to validate") - schema_path: SchemaPath = Field(..., description="SchemaPath to the element of the schema to validate") - - -class SchemaValidatorPath(SchemaValidatorPathData, InfrahubMessage): - pass - - -class SchemaValidatorPathResponseData(InfrahubResponseData): - violations: list[SchemaViolation] = Field(default_factory=list) - constraint_name: str - schema_path: SchemaPath - - -class SchemaValidatorPathResponse(InfrahubResponse): - routing_key: str = ROUTING_KEY - data: SchemaValidatorPathResponseData diff --git a/backend/infrahub/message_bus/operations/__init__.py b/backend/infrahub/message_bus/operations/__init__.py index 33e7324675..3869de60c8 100644 --- a/backend/infrahub/message_bus/operations/__init__.py +++ b/backend/infrahub/message_bus/operations/__init__.py @@ -11,7 +11,6 @@ git, refresh, requests, - schema, send, ) from infrahub.message_bus.types import MessageTTL @@ -42,8 +41,6 @@ "request.repository.checks": requests.repository.checks, "request.repository.user_checks": requests.repository.user_checks, "send.echo.request": send.echo.request, - "schema.migration.path": schema.migration.path, - "schema.validator.path": schema.validator.path, } diff --git a/backend/infrahub/message_bus/operations/schema/__init__.py b/backend/infrahub/message_bus/operations/schema/__init__.py deleted file mode 100644 index fce78d4115..0000000000 --- a/backend/infrahub/message_bus/operations/schema/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from . import migration, validator - -__all__ = ["migration", "validator"] diff --git a/backend/infrahub/message_bus/operations/schema/migration.py b/backend/infrahub/message_bus/operations/schema/migration.py deleted file mode 100644 index 9adf96b324..0000000000 --- a/backend/infrahub/message_bus/operations/schema/migration.py +++ /dev/null @@ -1,117 +0,0 @@ -from prefect import flow, task -from prefect.logging import get_run_logger -from prefect.runtime import task_run - -from infrahub.core.migrations import MIGRATION_MAP -from infrahub.log import get_logger -from infrahub.message_bus.messages.schema_migration_path import ( - SchemaMigrationPath, - SchemaMigrationPathData, - SchemaMigrationPathResponse, - SchemaMigrationPathResponseData, -) -from infrahub.services import InfrahubServices, services - -log = get_logger() - - -@flow(name="schema-migration-path") -async def path(message: SchemaMigrationPath, service: InfrahubServices) -> None: - async with service.database.start_session() as db: - node_kind = None - if message.new_node_schema: - node_kind = message.new_node_schema.kind - elif message.previous_node_schema: - node_kind = message.previous_node_schema.kind - - service.log.info( - "schema.migration.path - received", - migration=message.migration_name, - node_kind=node_kind, - path=message.schema_path.get_path(), - ) - migration_class = MIGRATION_MAP.get(message.migration_name) - if not migration_class: - raise ValueError(f"Unable to find the migration class for {message.migration_name}") - - migration = migration_class( - new_node_schema=message.new_node_schema, - previous_node_schema=message.previous_node_schema, - schema_path=message.schema_path, - ) - execution_result = await migration.execute(db=db, branch=message.branch) - - service.log.info( - "schema.migration.path - completed", - migration=message.migration_name, - node_kind=node_kind, - path=message.schema_path.get_path(), - result=execution_result, - ) - if message.reply_requested: - response = SchemaMigrationPathResponse( - data=SchemaMigrationPathResponseData( - migration_name=message.migration_name, - schema_path=message.schema_path, - errors=execution_result.errors, - nbr_migrations_executed=execution_result.nbr_migrations_executed, - ) - ) - await service.reply(message=response, initiator=message) - - -def generate_task_name() -> str: - task_name = task_run.task_name - message: SchemaMigrationPathData = task_run.parameters["message"] - return f"{task_name}-{message.branch.name}-{message.migration_name}" - - -@task( - task_run_name=generate_task_name, - description="Apply a given migration to the database", - retries=3, -) -async def schema_path_migrate(message: SchemaMigrationPathData) -> SchemaMigrationPathResponseData: - service = services.service - logger = get_run_logger() - - async with service.database.start_session() as db: - node_kind = None - if message.new_node_schema: - node_kind = message.new_node_schema.kind - elif message.previous_node_schema: - node_kind = message.previous_node_schema.kind - - service.log.info( - "schema.migration.path - received", - migration=message.migration_name, - node_kind=node_kind, - path=message.schema_path.get_path(), - ) - migration_class = MIGRATION_MAP.get(message.migration_name) - if not migration_class: - raise ValueError(f"Unable to find the migration class for {message.migration_name}") - - migration = migration_class( - new_node_schema=message.new_node_schema, - previous_node_schema=message.previous_node_schema, - schema_path=message.schema_path, - ) - execution_result = await migration.execute(db=db, branch=message.branch) - - logger.info(f"Migration completed for {message.migration_name}") - - service.log.info( - "schema.migration.path - completed", - migration=message.migration_name, - node_kind=node_kind, - path=message.schema_path.get_path(), - result=execution_result, - ) - - return SchemaMigrationPathResponseData( - migration_name=message.migration_name, - schema_path=message.schema_path, - errors=execution_result.errors, - nbr_migrations_executed=execution_result.nbr_migrations_executed, - ) diff --git a/backend/infrahub/message_bus/operations/schema/validator.py b/backend/infrahub/message_bus/operations/schema/validator.py deleted file mode 100644 index 02ee3983be..0000000000 --- a/backend/infrahub/message_bus/operations/schema/validator.py +++ /dev/null @@ -1,88 +0,0 @@ -from prefect import flow, task -from prefect.runtime import task_run - -from infrahub.core.validators.aggregated_checker import AggregatedConstraintChecker -from infrahub.core.validators.model import SchemaConstraintValidatorRequest -from infrahub.dependencies.registry import get_component_registry -from infrahub.log import get_logger -from infrahub.message_bus.messages.schema_validator_path import ( - SchemaValidatorPath, - SchemaValidatorPathData, - SchemaValidatorPathResponse, - SchemaValidatorPathResponseData, -) -from infrahub.services import InfrahubServices, services - -log = get_logger() - - -@flow(name="schema-validator-path") -async def path(message: SchemaValidatorPath, service: InfrahubServices) -> None: - async with service.database.start_session() as db: - log.info( - "schema.validator.path", - constraint=message.constraint_name, - node_kind=message.node_schema.kind, - path=message.schema_path.get_path(), - ) - - constraint_request = SchemaConstraintValidatorRequest( - branch=message.branch, - constraint_name=message.constraint_name, - node_schema=message.node_schema, - schema_path=message.schema_path, - ) - - component_registry = get_component_registry() - aggregated_constraint_checker = await component_registry.get_component( - AggregatedConstraintChecker, db=db, branch=message.branch - ) - violations = await aggregated_constraint_checker.run_constraints(constraint_request) - - if message.reply_requested: - response = SchemaValidatorPathResponse( - data=SchemaValidatorPathResponseData( - violations=violations, constraint_name=message.constraint_name, schema_path=message.schema_path - ) - ) - await service.reply(message=response, initiator=message) - - -def generate_task_name() -> str: - task_name = task_run.task_name - message: SchemaValidatorPathData = task_run.parameters["message"] - return f"{task_name}-{message.branch.name}-{message.constraint_name}" - - -@task( - task_run_name=generate_task_name, - description="Validate if a given migration is compatible with the existing data", - retries=3, -) -async def schema_path_validate(message: SchemaValidatorPathData) -> SchemaValidatorPathResponseData: - service = services.service - - async with service.database.start_session() as db: - log.info( - "schema.validator.path", - constraint=message.constraint_name, - node_kind=message.node_schema.kind, - path=message.schema_path.get_path(), - ) - - constraint_request = SchemaConstraintValidatorRequest( - branch=message.branch, - constraint_name=message.constraint_name, - node_schema=message.node_schema, - schema_path=message.schema_path, - ) - - component_registry = get_component_registry() - aggregated_constraint_checker = await component_registry.get_component( - AggregatedConstraintChecker, db=db, branch=message.branch - ) - violations = await aggregated_constraint_checker.run_constraints(constraint_request) - - return SchemaValidatorPathResponseData( - violations=violations, constraint_name=message.constraint_name, schema_path=message.schema_path - ) diff --git a/backend/tests/unit/api/test_40_schema_api.py b/backend/tests/unit/api/test_40_schema_api.py index 12ad155e23..05c0fd657e 100644 --- a/backend/tests/unit/api/test_40_schema_api.py +++ b/backend/tests/unit/api/test_40_schema_api.py @@ -6,6 +6,7 @@ from infrahub.core.initialization import create_branch from infrahub.core.node import Node from infrahub.core.schema import SchemaRoot, core_models +from infrahub.core.utils import count_relationships from infrahub.database import InfrahubDatabase @@ -230,6 +231,172 @@ async def test_schema_load_restricted_namespace( assert response.json()["errors"][0]["message"] == "Restricted namespace 'Internal' used on 'Timestamp'" +async def test_schema_load_endpoint_idempotent_simple( + db: InfrahubDatabase, + client: TestClient, + admin_headers, + default_branch: Branch, + prefect_test_fixture, + workflow_local, + register_core_schema_db, + authentication_base, + helper, +): + # Load the schema in the database + schema = registry.schema.get_schema_branch(name=default_branch.name) + await registry.schema.load_schema_to_db(schema=schema, branch=default_branch, db=db) + + # Must execute in a with block to execute the startup/shutdown events + with client: + creation = client.post( + "/api/schema/load", headers=admin_headers, json={"schemas": [helper.schema_file("infra_simple_01.json")]} + ) + read = client.get("/api/schema", headers=admin_headers) + + nbr_rels = await count_relationships(db=db) + + assert creation.status_code == 200 + assert read.status_code == 200 + nodes = read.json()["nodes"] + device = [node for node in nodes if node["name"] == "Device"] + assert device + device = device[0] + attributes = {attrib["name"]: attrib["order_weight"] for attrib in device["attributes"]} + relationships = {attrib["name"]: attrib["order_weight"] for attrib in device["relationships"]} + assert attributes["name"] == 1000 + assert attributes["description"] == 900 + assert attributes["type"] == 3000 + assert relationships["interfaces"] == 450 + assert relationships["tags"] == 7000 + + creation = client.post( + "/api/schema/load", headers=admin_headers, json={"schemas": [helper.schema_file("infra_simple_01.json")]} + ) + read = client.get("/api/schema", headers=admin_headers) + + assert creation.status_code == 200 + assert read.status_code == 200 + + assert nbr_rels == await count_relationships(db=db) + + +async def test_schema_load_endpoint_valid_with_generics( + db: InfrahubDatabase, + client: TestClient, + admin_headers, + default_branch: Branch, + prefect_test_fixture, + workflow_local, + register_core_schema_db, + authentication_base, + helper, +): + # Load the schema in the database + schema = registry.schema.get_schema_branch(name=default_branch.name) + await registry.schema.load_schema_to_db(schema=schema, branch=default_branch, db=db) + + # Must execute in a with block to execute the startup/shutdown events + with client: + response1 = client.post( + "/api/schema/load", + headers=admin_headers, + json={"schemas": [helper.schema_file("infra_w_generics_01.json")]}, + ) + assert response1.status_code == 200 + + response2 = client.get("/api/schema", headers=admin_headers) + assert response2.status_code == 200 + + schema = response2.json() + assert len(schema["generics"]) == len(core_models.get("generics")) + 1 + + +async def test_schema_load_endpoint_idempotent_with_generics( + db: InfrahubDatabase, + client: TestClient, + admin_headers, + default_branch: Branch, + prefect_test_fixture, + workflow_local, + register_core_schema_db, + authentication_base, + helper, +): + # Load the schema in the database + schema = registry.schema.get_schema_branch(name=default_branch.name) + await registry.schema.load_schema_to_db(schema=schema, branch=default_branch, db=db) + + # Must execute in a with block to execute the startup/shutdown events + with client: + response1 = client.post( + "/api/schema/load", + headers=admin_headers, + json={"schemas": [helper.schema_file("infra_w_generics_01.json")]}, + ) + assert response1.json()["schema_updated"] + assert response1.status_code == 200 + + response2 = client.get("/api/schema", headers=admin_headers) + assert response2.status_code == 200 + + schema = response2.json() + assert len(schema["generics"]) == len(core_models.get("generics")) + 1 + + nbr_rels = await count_relationships(db=db) + + response3 = client.post( + "/api/schema/load", + headers=admin_headers, + json={"schemas": [helper.schema_file("infra_w_generics_01.json")]}, + ) + assert response3.json()["schema_updated"] is False + assert response3.status_code == 200 + + response4 = client.get("/api/schema", headers=admin_headers) + assert response4.status_code == 200 + + nbr_rels_after = await count_relationships(db=db) + assert nbr_rels == nbr_rels_after + + +async def test_schema_load_endpoint_valid_with_extensions( + db: InfrahubDatabase, + client: TestClient, + admin_headers, + rpc_bus, + default_branch: Branch, + prefect_test_fixture, + workflow_local, + authentication_base, + helper, +): + # Load the schema in the database + schema = registry.schema.get_schema_branch(name=default_branch.name) + await registry.schema.load_schema_to_db(schema=schema, branch=default_branch, db=db) + + org_schema = registry.schema.get(name="CoreOrganization", branch=default_branch.name) + initial_nbr_relationships = len(org_schema.relationships) + + schema = registry.schema.get_schema_branch(name=default_branch.name) + await registry.schema.load_schema_to_db( + db=db, schema=schema, branch=default_branch, limit=["CoreOrganization", "InfraSite"] + ) + + # Must execute in a with block to execute the startup/shutdown events + with client: + response = client.post( + "/api/schema/load", + headers=admin_headers, + json={"schemas": [helper.schema_file("infra_w_extensions_01.json")]}, + ) + + assert response.json()["schema_updated"] + assert response.status_code == 200 + + org_schema = registry.schema.get(name="CoreOrganization", branch=default_branch.name) + assert len(org_schema.relationships) == initial_nbr_relationships + 1 + + async def test_schema_load_endpoint_not_valid_simple_02( db: InfrahubDatabase, client: TestClient, diff --git a/backend/tests/unit/core/constraint_validators/test_attribute_regex_update.py b/backend/tests/unit/core/constraint_validators/test_attribute_regex_update.py index d937cda770..7fef2ac677 100644 --- a/backend/tests/unit/core/constraint_validators/test_attribute_regex_update.py +++ b/backend/tests/unit/core/constraint_validators/test_attribute_regex_update.py @@ -1,5 +1,3 @@ -from infrahub_sdk import InfrahubClient - from infrahub.core import registry from infrahub.core.branch import Branch from infrahub.core.constants import NULL_VALUE, PathType, SchemaPathType @@ -9,9 +7,6 @@ from infrahub.core.validators.attribute.regex import AttributeRegexChecker, AttributeRegexUpdateValidatorQuery from infrahub.core.validators.model import SchemaConstraintValidatorRequest from infrahub.database import InfrahubDatabase -from infrahub.message_bus.messages import SchemaValidatorPathResponse -from infrahub.message_bus.messages.schema_validator_path import SchemaValidatorPath -from infrahub.services import InfrahubServices async def test_query( @@ -183,27 +178,3 @@ async def test_validator( data_paths = grouped_data_paths[0].get_all_data_paths() assert len(data_paths) == 1 assert data_paths[0].node_id == person_john_main.id - - -async def test_rpc( - db: InfrahubDatabase, default_branch: Branch, car_accord_main: Node, car_volt_main: Node, person_john_main, helper -): - person_schema = registry.schema.get(name="TestPerson", branch=default_branch) - name_attr = person_schema.get_attribute(name="name") - name_attr.regex = r"^[A-Z]+$" - registry.schema.set(name="TestPerson", schema=person_schema, branch=default_branch.name) - - message = SchemaValidatorPath( - constraint_name="attribute.regex.update", - node_schema=person_schema, - schema_path=SchemaPath(path_type=SchemaPathType.ATTRIBUTE, schema_kind="TestPerson", field_name="name"), - branch=default_branch, - ) - - bus_simulator = helper.get_message_bus_simulator() - service = InfrahubServices(message_bus=bus_simulator, client=InfrahubClient(), database=db) - bus_simulator.service = service - - response = await service.message_bus.rpc(message=message, response_class=SchemaValidatorPathResponse) - assert len(response.data.violations) == 1 - assert response.data.violations[0].display_label == f"Node (TestPerson: {person_john_main.id})" diff --git a/backend/tests/unit/core/migrations/schema/test_node_attribute_add.py b/backend/tests/unit/core/migrations/schema/test_node_attribute_add.py index 91addd687c..c215d35547 100644 --- a/backend/tests/unit/core/migrations/schema/test_node_attribute_add.py +++ b/backend/tests/unit/core/migrations/schema/test_node_attribute_add.py @@ -1,8 +1,6 @@ import uuid import pytest -from infrahub_sdk import InfrahubClient -from infrahub_sdk.uuidt import UUIDT from infrahub.core import registry from infrahub.core.branch import Branch @@ -20,9 +18,6 @@ from infrahub.core.timestamp import Timestamp from infrahub.core.utils import count_nodes from infrahub.database import InfrahubDatabase -from infrahub.message_bus import Meta -from infrahub.message_bus.messages import SchemaMigrationPath, SchemaMigrationPathResponse -from infrahub.services import InfrahubServices @pytest.fixture @@ -158,30 +153,3 @@ async def test_migration(db: InfrahubDatabase, default_branch, init_database, sc assert execution_result.nbr_migrations_executed == 5 assert await count_nodes(db=db, label="TestCar") == 5 assert await count_nodes(db=db, label="Attribute") == 5 - - -async def test_rpc(db: InfrahubDatabase, default_branch, init_database, schema_aware, helper): - node = schema_aware - correlation_id = str(UUIDT()) - message = SchemaMigrationPath( - migration_name="node.attribute.add", - new_node_schema=node, - previous_node_schema=node, - schema_path=SchemaPath(path_type=SchemaPathType.ATTRIBUTE, schema_kind="TestCar", field_name="nbr_doors"), - branch=default_branch, - meta=Meta(reply_to="ci-testing", correlation_id=correlation_id), - ) - - bus_simulator = helper.get_message_bus_simulator() - service = InfrahubServices(message_bus=bus_simulator, client=InfrahubClient(), database=db) - bus_simulator.service = service - - assert await count_nodes(db=db, label="TestCar") == 5 - assert await count_nodes(db=db, label="Attribute") == 0 - - response = await service.message_bus.rpc(message=message, response_class=SchemaMigrationPathResponse) - assert response.passed - assert not response.data.errors - - assert await count_nodes(db=db, label="TestCar") == 5 - assert await count_nodes(db=db, label="Attribute") == 5 diff --git a/backend/tests/unit/core/migrations/schema/test_node_remove.py b/backend/tests/unit/core/migrations/schema/test_node_remove.py index 361a503ef7..127d0fe75a 100644 --- a/backend/tests/unit/core/migrations/schema/test_node_remove.py +++ b/backend/tests/unit/core/migrations/schema/test_node_remove.py @@ -1,6 +1,3 @@ -from infrahub_sdk import InfrahubClient -from infrahub_sdk.uuidt import UUIDT - from infrahub.core import registry from infrahub.core.branch import Branch from infrahub.core.constants import SchemaPathType @@ -12,9 +9,6 @@ from infrahub.core.path import SchemaPath from infrahub.core.utils import count_nodes, count_relationships from infrahub.database import InfrahubDatabase -from infrahub.message_bus import Meta -from infrahub.message_bus.messages import SchemaMigrationPath, SchemaMigrationPathResponse -from infrahub.services import InfrahubServices async def test_query_out_default_branch(db: InfrahubDatabase, default_branch: Branch, car_accord_main, car_camry_main): @@ -102,32 +96,3 @@ async def test_migration(db: InfrahubDatabase, default_branch: Branch, car_accor assert execution_result.nbr_migrations_executed == 2 assert await count_relationships(db=db) == count_rels + 14 assert await count_nodes(db=db, label="TestCar") == 2 - - -async def test_rpc(db: InfrahubDatabase, default_branch: Branch, car_accord_main, car_camry_main, helper): - schema = registry.schema.get_schema_branch(name=default_branch.name) - candidate_schema = schema.duplicate() - candidate_schema.delete(name="TestCar") - - correlation_id = str(UUIDT()) - message = SchemaMigrationPath( - migration_name="node.remove", - new_node_schema=None, - previous_node_schema=schema.get(name="TestCar"), - schema_path=SchemaPath(path_type=SchemaPathType.NODE, schema_kind="TestCar"), - branch=default_branch, - meta=Meta(reply_to="ci-testing", correlation_id=correlation_id), - ) - - bus_simulator = helper.get_message_bus_simulator() - service = InfrahubServices(message_bus=bus_simulator, client=InfrahubClient(), database=db) - bus_simulator.service = service - - assert await count_nodes(db=db, label="TestCar") == 2 - - response = await service.message_bus.rpc(message=message, response_class=SchemaMigrationPathResponse) - assert response.passed - assert not response.data.errors - assert response.data.nbr_migrations_executed == 2 - - assert await count_nodes(db=db, label="TestCar") == 2 diff --git a/backend/tests/unit/core/migrations/schema/test_runner.py b/backend/tests/unit/core/migrations/schema/test_runner.py deleted file mode 100644 index 8cc83d7a74..0000000000 --- a/backend/tests/unit/core/migrations/schema/test_runner.py +++ /dev/null @@ -1,49 +0,0 @@ -from infrahub_sdk import InfrahubClient - -from infrahub.core import registry -from infrahub.core.branch import Branch -from infrahub.core.constants import SchemaPathType -from infrahub.core.migrations.schema.runner import schema_migrations_runner -from infrahub.core.models import SchemaUpdateMigrationInfo -from infrahub.core.node import Node -from infrahub.core.path import SchemaPath -from infrahub.core.schema import AttributeSchema -from infrahub.core.utils import count_nodes -from infrahub.database import InfrahubDatabase -from infrahub.services import InfrahubServices - - -async def test_schema_migrations_runner( - db: InfrahubDatabase, default_branch: Branch, car_accord_main: Node, car_volt_main: Node, person_john_main, helper -): - schema = registry.schema.get_schema_branch(name=default_branch.name).duplicate() - person_schema = schema.get(name="TestPerson") - person_schema.attributes.append(AttributeSchema(name="color", kind="Text", optional=True)) - schema.set(name="TestPerson", schema=person_schema) - schema.process() - - migrations = [ - SchemaUpdateMigrationInfo( - migration_name="node.attribute.add", - path=SchemaPath(path_type=SchemaPathType.ATTRIBUTE, schema_kind="TestPerson", field_name="color"), - ) - ] - - bus_simulator = helper.get_message_bus_simulator() - service = InfrahubServices(message_bus=bus_simulator, client=InfrahubClient(), database=db) - bus_simulator.service = service - - assert await count_nodes(db=db, label="TestPerson") == 1 - assert await count_nodes(db=db, label="Attribute") == 12 - - errors = await schema_migrations_runner( - branch=default_branch, - new_schema=schema, - previous_schema=registry.schema.get_schema_branch(name=default_branch.name), - migrations=migrations, - service=service, - ) - assert errors == [] - - assert await count_nodes(db=db, label="TestPerson") == 1 - assert await count_nodes(db=db, label="Attribute") == 13 diff --git a/docs/docs/reference/message-bus-events.mdx b/docs/docs/reference/message-bus-events.mdx index 8accc62bd0..3efeebd5ef 100644 --- a/docs/docs/reference/message-bus-events.mdx +++ b/docs/docs/reference/message-bus-events.mdx @@ -350,48 +350,6 @@ For more detailed explanations on how to use these events within Infrahub, see t - -### Schema Migration - - - -#### Event schema.migration.path - - -**Priority**: 3 - - -| Key | Description | Type | Default Value | -|-----|-------------|------|---------------| -| **meta** | Meta properties for the message | N/A | None | -| **branch** | The name of the branch to target | N/A | None | -| **migration_name** | The name of the migration to run | string | None | -| **new_node_schema** | new Schema of Node or Generic to process | N/A | None | -| **previous_node_schema** | Previous Schema of Node or Generic to process | N/A | None | -| **schema_path** | SchemaPath to the element of the schema to migrate | N/A | None | - - - -### Schema Validator - - - -#### Event schema.validator.path - - -**Priority**: 3 - - -| Key | Description | Type | Default Value | -|-----|-------------|------|---------------| -| **meta** | Meta properties for the message | N/A | None | -| **branch** | The name of the branch to target | N/A | None | -| **constraint_name** | The name of the constraint to validate | string | None | -| **node_schema** | Schema of Node or Generic to validate | N/A | None | -| **schema_path** | SchemaPath to the element of the schema to validate | N/A | None | - - - ### Refresh Git @@ -954,50 +912,6 @@ For more detailed explanations on how to use these events within Infrahub, see t - -### Schema Migration - - - -#### Event schema.migration.path - - -**Priority**: 3 - - - -| Key | Description | Type | Default Value | -|-----|-------------|------|---------------| -| **meta** | Meta properties for the message | N/A | None | -| **branch** | The name of the branch to target | N/A | None | -| **migration_name** | The name of the migration to run | string | None | -| **new_node_schema** | new Schema of Node or Generic to process | N/A | None | -| **previous_node_schema** | Previous Schema of Node or Generic to process | N/A | None | -| **schema_path** | SchemaPath to the element of the schema to migrate | N/A | None | - - - -### Schema Validator - - - -#### Event schema.validator.path - - -**Priority**: 3 - - - -| Key | Description | Type | Default Value | -|-----|-------------|------|---------------| -| **meta** | Meta properties for the message | N/A | None | -| **branch** | The name of the branch to target | N/A | None | -| **constraint_name** | The name of the constraint to validate | string | None | -| **node_schema** | Schema of Node or Generic to validate | N/A | None | -| **schema_path** | SchemaPath to the element of the schema to validate | N/A | None | - - - ### Refresh Git From 13e0fa03effba42d60aac2bf65a9bbbd7e14e4f8 Mon Sep 17 00:00:00 2001 From: Damien Garros Date: Mon, 11 Nov 2024 08:27:01 +0100 Subject: [PATCH 2/4] Remove schema_validators_checker --- backend/infrahub/core/validators/checker.py | 71 ---------- .../operations/requests/proposed_change.py | 121 ++++++++++++++++++ .../constraint_validators/test_checker.py | 39 ------ 3 files changed, 121 insertions(+), 110 deletions(-) delete mode 100644 backend/infrahub/core/validators/checker.py delete mode 100644 backend/tests/unit/core/constraint_validators/test_checker.py diff --git a/backend/infrahub/core/validators/checker.py b/backend/infrahub/core/validators/checker.py deleted file mode 100644 index 1a7d3d1034..0000000000 --- a/backend/infrahub/core/validators/checker.py +++ /dev/null @@ -1,71 +0,0 @@ -from __future__ import annotations - -import asyncio -from typing import TYPE_CHECKING - -from infrahub.message_bus.messages import MESSAGE_MAP, RESPONSE_MAP - -if TYPE_CHECKING: - from infrahub.core.branch import Branch - from infrahub.core.models import SchemaUpdateConstraintInfo - from infrahub.core.schema.schema_branch import SchemaBranch - from infrahub.message_bus.messages.schema_validator_path import SchemaValidatorPathResponse - from infrahub.services import InfrahubServices - - -async def schema_validators_checker( - branch: Branch, schema: SchemaBranch, constraints: list[SchemaUpdateConstraintInfo], service: InfrahubServices -) -> tuple[list[str], list[SchemaValidatorPathResponse]]: - tasks = [] - error_messages: list[str] = [] - - if not constraints: - return error_messages, [] - - for constraint in constraints: - service.log.info( - f"Preparing validator for constraint {constraint.constraint_name!r} ({constraint.routing_key})", - branch=branch.name, - constraint_name=constraint.constraint_name, - routing_key=constraint.routing_key, - ) - message_class = MESSAGE_MAP.get(constraint.routing_key, None) - response_class = RESPONSE_MAP.get(constraint.routing_key, None) - - if not message_class: - raise ValueError( - f"Unable to find the message for {constraint.constraint_name!r} ({constraint.routing_key})" - ) - if not response_class: - raise ValueError( - f"Unable to find the response for {constraint.constraint_name!r} ({constraint.routing_key})" - ) - - message = message_class( # type: ignore[call-arg] - branch=branch, - constraint_name=constraint.constraint_name, - node_schema=schema.get(name=constraint.path.schema_kind), - schema_path=constraint.path, - ) - tasks.append(service.message_bus.rpc(message=message, response_class=response_class)) - - if not tasks: - return error_messages, [] - - responses: list[SchemaValidatorPathResponse] = await asyncio.gather(*tasks) # type: ignore[assignment] - - for response in responses: - if not response.passed: - if response.initial_message: - error_messages.append( - f"Unable to execute the validator for {response.initial_message.get('constraint_name')!r}: " - + ", ".join(response.errors) - ) - else: - error_messages.append("Unknown error while executing the validators:" + ", ".join(response.errors)) - continue - - for violation in response.data.violations: # type: ignore[union-attr] - error_messages.append(violation.message) - - return error_messages, responses diff --git a/backend/infrahub/message_bus/operations/requests/proposed_change.py b/backend/infrahub/message_bus/operations/requests/proposed_change.py index 6bbc25d113..3b43f61a58 100644 --- a/backend/infrahub/message_bus/operations/requests/proposed_change.py +++ b/backend/infrahub/message_bus/operations/requests/proposed_change.py @@ -9,6 +9,7 @@ from infrahub.core.constants import CheckType, InfrahubKind, RepositoryInternalStatus from infrahub.core.diff.coordinator import DiffCoordinator from infrahub.core.registry import registry +from infrahub.core.validators.determiner import ConstraintValidatorDeterminer from infrahub.dependencies.registry import get_component_registry from infrahub.git.repository import InfrahubRepository from infrahub.log import get_logger @@ -34,6 +35,16 @@ REQUEST_PROPOSED_CHANGE_SCHEMA_INTEGRITY, REQUEST_PROPOSED_CHANGE_USER_TESTS, ) +from infrahub.workflows.catalogue import REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY +from infrahub.core.validators.tasks import schema_validate_migrations +from infrahub.core.validators.models.validate_migration import SchemaValidateMigrationData + +if TYPE_CHECKING: + from infrahub_sdk.node import InfrahubNode + + from infrahub.core.models import SchemaUpdateConstraintInfo + from infrahub.core.schema.schema_branch import SchemaBranch + log = get_logger() @@ -188,6 +199,116 @@ async def pipeline(message: messages.RequestProposedChangePipeline, service: Inf name="proposed-changed-refresh-artifact", flow_run_name="Refreshing artifacts for change_proposal={message.proposed_change}", ) +@flow(name="proposed-changed-schema-integrity") +async def schema_integrity( + message: messages.RequestProposedChangeSchemaIntegrity, + service: InfrahubServices, # pylint: disable=unused-argument +) -> None: + async with service.task_report( + related_node=message.proposed_change, + title="Schema Integrity", + ): + log.info(f"Got a request to process schema integrity defined in proposed_change: {message.proposed_change}") + + # For now, we retrieve the latest schema for each branch from the registry + # In the future it would be good to generate the object SchemaUpdateValidationResult from message.branch_diff + source_schema = registry.schema.get_schema_branch(name=message.source_branch).duplicate() + dest_schema = registry.schema.get_schema_branch(name=message.destination_branch).duplicate() + + candidate_schema = dest_schema.duplicate() + candidate_schema.update(schema=source_schema) + validation_result = dest_schema.validate_update(other=candidate_schema) + + constraints_from_data_diff = await _get_proposed_change_schema_integrity_constraints( + message=message, schema=candidate_schema + ) + constraints_from_schema_diff = validation_result.constraints + constraints = set(constraints_from_data_diff + constraints_from_schema_diff) + + if not constraints: + return + + # ---------------------------------------------------------- + # Validate if the new schema is valid with the content of the database + # ---------------------------------------------------------- + source_branch = registry.get_branch_from_registry(branch=message.source_branch) + responses = await schema_validate_migrations(message=SchemaValidateMigrationData( + branch=source_branch, schema_branch=candidate_schema, constraints=list(constraints) + )) + + # TODO we need to report a failure if an error happened during the execution of a validator + conflicts: list[SchemaConflict] = [] + for response in responses: + for violation in response.data.violations: + conflicts.append( + SchemaConflict( + name=response.data.schema_path.get_path(), + type=response.data.constraint_name, + kind=violation.node_kind, + id=violation.node_id, + path=response.data.schema_path.get_path(), + value=violation.message, + branch="placeholder", + ) + ) + + if not conflicts: + return + + async with service.database.start_transaction() as db: + object_conflict_validator_recorder = ObjectConflictValidatorRecorder( + db=db, + validator_kind=InfrahubKind.SCHEMAVALIDATOR, + validator_label="Schema Integrity", + check_schema_kind=InfrahubKind.SCHEMACHECK, + ) + await object_conflict_validator_recorder.record_conflicts( + proposed_change_id=message.proposed_change, conflicts=conflicts + ) + + +@flow(name="proposed-changed-repository-check") +async def repository_checks(message: messages.RequestProposedChangeRepositoryChecks, service: InfrahubServices) -> None: + async with service.task_report( + related_node=message.proposed_change, + title=f"Evaluating Repository Checks {len(message.branch_diff.repositories)} repositories", + ) as task_report: + log.info(f"Got a request to process checks defined in proposed_change: {message.proposed_change}") + events: list[InfrahubMessage] = [] + for repository in message.branch_diff.repositories: + log_line = "Skipping merge conflict checks for data only branch" + if ( + message.source_branch_sync_with_git + and not repository.read_only + and repository.internal_status == RepositoryInternalStatus.ACTIVE.value + ): + events.append( + messages.RequestRepositoryChecks( + proposed_change=message.proposed_change, + repository=repository.repository_id, + source_branch=message.source_branch, + target_branch=message.destination_branch, + ) + ) + log_line = "Requesting merge conflict checks" + await task_report.info(f"{repository.repository_name}: {log_line}") + events.append( + messages.RequestRepositoryUserChecks( + proposed_change=message.proposed_change, + repository=repository.repository_id, + source_branch=message.source_branch, + source_branch_sync_with_git=message.source_branch_sync_with_git, + target_branch=message.destination_branch, + branch_diff=message.branch_diff, + ) + ) + await task_report.info(f"{repository.repository_name}: Requesting user checks") + for event in events: + event.assign_meta(parent=message) + await service.send(message=event) + + +@flow(name="proposed-changed-refresh-artifact") async def refresh_artifacts(message: messages.RequestProposedChangeRefreshArtifacts, service: InfrahubServices) -> None: definition_information = await service.client.execute_graphql( query=GATHER_ARTIFACT_DEFINITIONS, diff --git a/backend/tests/unit/core/constraint_validators/test_checker.py b/backend/tests/unit/core/constraint_validators/test_checker.py deleted file mode 100644 index 995fc10bc4..0000000000 --- a/backend/tests/unit/core/constraint_validators/test_checker.py +++ /dev/null @@ -1,39 +0,0 @@ -from infrahub_sdk import InfrahubClient - -from infrahub.core import registry -from infrahub.core.branch import Branch -from infrahub.core.constants import SchemaPathType -from infrahub.core.models import SchemaUpdateConstraintInfo -from infrahub.core.node import Node -from infrahub.core.path import SchemaPath -from infrahub.core.validators.checker import schema_validators_checker -from infrahub.database import InfrahubDatabase -from infrahub.services import InfrahubServices - - -async def test_schema_validators_checker( - db: InfrahubDatabase, default_branch: Branch, car_accord_main: Node, car_volt_main: Node, person_john_main, helper -): - schema = registry.schema.get_schema_branch(name=default_branch.name).duplicate() - person_schema = schema.get(name="TestPerson") - name_attr = person_schema.get_attribute(name="name") - name_attr.regex = r"^[A-Z]+$" - schema.set(name="TestPerson", schema=person_schema) - - constraints = [ - SchemaUpdateConstraintInfo( - constraint_name="attribute.regex.update", - path=SchemaPath(path_type=SchemaPathType.ATTRIBUTE, schema_kind="TestPerson", field_name="name"), - ) - ] - - bus_simulator = helper.get_message_bus_simulator() - service = InfrahubServices(message_bus=bus_simulator, client=InfrahubClient(), database=db) - bus_simulator.service = service - - errors, _ = await schema_validators_checker( - branch=default_branch, schema=schema, constraints=constraints, service=service - ) - - assert len(errors) == 1 - assert "is not compatible with the constraint 'attribute.regex.update'" in errors[0] From 6e71ef51b777164ecb6a9d186d09aa65008de169 Mon Sep 17 00:00:00 2001 From: Damien Garros Date: Mon, 11 Nov 2024 08:49:38 +0100 Subject: [PATCH 3/4] Refactor schema_validate_migrations to return a list of SchemaValidatorPathResponseData --- backend/infrahub/core/branch/tasks.py | 6 +++-- .../validators/models/validate_migration.py | 3 +++ backend/infrahub/core/validators/tasks.py | 22 +++++++++---------- backend/infrahub/graphql/mutations/tasks.py | 5 +++-- .../operations/requests/proposed_change.py | 20 +++++++++-------- 5 files changed, 31 insertions(+), 25 deletions(-) diff --git a/backend/infrahub/core/branch/tasks.py b/backend/infrahub/core/branch/tasks.py index 6e4ba6887a..e897d33b2f 100644 --- a/backend/infrahub/core/branch/tasks.py +++ b/backend/infrahub/core/branch/tasks.py @@ -78,10 +78,12 @@ async def rebase_branch(branch: str) -> None: if obj.has_schema_changes: constraints += await merger.calculate_validations(target_schema=candidate_schema) if constraints: - error_messages = await schema_validate_migrations( + responses = await schema_validate_migrations( message=SchemaValidateMigrationData(branch=obj, schema_branch=candidate_schema, constraints=constraints) ) - if error_messages: + + if responses: + error_messages = [violation.message for response in responses for violation in response.violations] raise ValidationError(",\n".join(error_messages)) schema_in_main_before = merger.destination_schema.duplicate() diff --git a/backend/infrahub/core/validators/models/validate_migration.py b/backend/infrahub/core/validators/models/validate_migration.py index 534d645cd2..7394deb628 100644 --- a/backend/infrahub/core/validators/models/validate_migration.py +++ b/backend/infrahub/core/validators/models/validate_migration.py @@ -21,3 +21,6 @@ class SchemaValidatorPathResponseData(InfrahubResponseData): violations: list[SchemaViolation] = Field(default_factory=list) constraint_name: str schema_path: SchemaPath + + def get_messages(self) -> list[str]: + return [violation.message for violation in self.violations] diff --git a/backend/infrahub/core/validators/tasks.py b/backend/infrahub/core/validators/tasks.py index da016faaee..81a935e238 100644 --- a/backend/infrahub/core/validators/tasks.py +++ b/backend/infrahub/core/validators/tasks.py @@ -2,6 +2,7 @@ from infrahub_sdk.batch import InfrahubBatch from prefect import flow, task +from prefect.logging import get_run_logger from infrahub.core.branch import Branch # noqa: TCH001 from infrahub.core.path import SchemaPath # noqa: TCH001 @@ -18,22 +19,19 @@ @flow(name="schema_validate_migrations", flow_run_name="Validate schema migrations") -async def schema_validate_migrations(message: SchemaValidateMigrationData) -> list[str]: +async def schema_validate_migrations(message: SchemaValidateMigrationData) -> list[SchemaValidatorPathResponseData]: batch = InfrahubBatch(return_exceptions=True) - error_messages: list[str] = [] - service = services.service + violations: list[SchemaValidatorPathResponseData] = [] + + log = get_run_logger() + await add_branch_tag(branch_name=message.branch.name) if not message.constraints: - return error_messages + return [] for constraint in message.constraints: - service.log.info( - f"Preparing validator for constraint {constraint.constraint_name!r} ({constraint.routing_key})", - branch=message.branch.name, - constraint_name=constraint.constraint_name, - routing_key=constraint.routing_key, - ) + log.info(f"Preparing validator for constraint {constraint.constraint_name!r} ({constraint.routing_key})") batch.add( task=schema_path_validate, @@ -45,9 +43,9 @@ async def schema_validate_migrations(message: SchemaValidateMigrationData) -> li async for _, result in batch.execute(): for violation in result.violations: - error_messages.append(violation.message) + violations.append(violation) - return error_messages + return violations @task( diff --git a/backend/infrahub/graphql/mutations/tasks.py b/backend/infrahub/graphql/mutations/tasks.py index 651c0c1347..946eb9118e 100644 --- a/backend/infrahub/graphql/mutations/tasks.py +++ b/backend/infrahub/graphql/mutations/tasks.py @@ -57,10 +57,11 @@ async def merge_branch_mutation(branch: str) -> None: constraints += await merger.calculate_validations(target_schema=candidate_schema) if constraints: - error_messages = await schema_validate_migrations( + responses = await schema_validate_migrations( message=SchemaValidateMigrationData(branch=obj, schema_branch=candidate_schema, constraints=constraints) ) - if error_messages: + if responses: + error_messages = [violation.message for response in responses for violation in response.violations] raise ValidationError(",\n".join(error_messages)) await service.workflow.execute_workflow(workflow=BRANCH_MERGE, parameters={"branch": obj.name}) diff --git a/backend/infrahub/message_bus/operations/requests/proposed_change.py b/backend/infrahub/message_bus/operations/requests/proposed_change.py index 3b43f61a58..178dad2208 100644 --- a/backend/infrahub/message_bus/operations/requests/proposed_change.py +++ b/backend/infrahub/message_bus/operations/requests/proposed_change.py @@ -10,6 +10,8 @@ from infrahub.core.diff.coordinator import DiffCoordinator from infrahub.core.registry import registry from infrahub.core.validators.determiner import ConstraintValidatorDeterminer +from infrahub.core.validators.models.validate_migration import SchemaValidateMigrationData +from infrahub.core.validators.tasks import schema_validate_migrations from infrahub.dependencies.registry import get_component_registry from infrahub.git.repository import InfrahubRepository from infrahub.log import get_logger @@ -36,8 +38,6 @@ REQUEST_PROPOSED_CHANGE_USER_TESTS, ) from infrahub.workflows.catalogue import REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY -from infrahub.core.validators.tasks import schema_validate_migrations -from infrahub.core.validators.models.validate_migration import SchemaValidateMigrationData if TYPE_CHECKING: from infrahub_sdk.node import InfrahubNode @@ -232,21 +232,23 @@ async def schema_integrity( # Validate if the new schema is valid with the content of the database # ---------------------------------------------------------- source_branch = registry.get_branch_from_registry(branch=message.source_branch) - responses = await schema_validate_migrations(message=SchemaValidateMigrationData( - branch=source_branch, schema_branch=candidate_schema, constraints=list(constraints) - )) + responses = await schema_validate_migrations( + message=SchemaValidateMigrationData( + branch=source_branch, schema_branch=candidate_schema, constraints=list(constraints) + ) + ) # TODO we need to report a failure if an error happened during the execution of a validator conflicts: list[SchemaConflict] = [] for response in responses: - for violation in response.data.violations: + for violation in response.violations: conflicts.append( SchemaConflict( - name=response.data.schema_path.get_path(), - type=response.data.constraint_name, + name=response.schema_path.get_path(), + type=response.constraint_name, kind=violation.node_kind, id=violation.node_id, - path=response.data.schema_path.get_path(), + path=response.schema_path.get_path(), value=violation.message, branch="placeholder", ) From 1450381ddb6b5118d9c2cab16be1613377a581d8 Mon Sep 17 00:00:00 2001 From: Lucas Guillermou Date: Fri, 29 Nov 2024 16:01:20 +0100 Subject: [PATCH 4/4] Fix tests --- backend/infrahub/api/schema.py | 17 +- backend/infrahub/core/branch/tasks.py | 4 +- backend/infrahub/core/validators/tasks.py | 10 +- backend/infrahub/graphql/mutations/tasks.py | 4 +- .../operations/requests/proposed_change.py | 123 ------------- backend/infrahub/proposed_change/tasks.py | 17 +- .../request/test_proposed_change.py | 1 - backend/tests/unit/api/test_40_schema_api.py | 167 ------------------ .../test_task_schema_validate_migrations.py | 7 +- 9 files changed, 31 insertions(+), 319 deletions(-) diff --git a/backend/infrahub/api/schema.py b/backend/infrahub/api/schema.py index 8802af4d23..8dc2836eb6 100644 --- a/backend/infrahub/api/schema.py +++ b/backend/infrahub/api/schema.py @@ -27,7 +27,10 @@ ) from infrahub.core.schema import GenericSchema, MainSchemaTypes, NodeSchema, ProfileSchema, SchemaRoot from infrahub.core.schema.constants import SchemaNamespace # noqa: TCH001 -from infrahub.core.validators.models.validate_migration import SchemaValidateMigrationData +from infrahub.core.validators.models.validate_migration import ( + SchemaValidateMigrationData, + SchemaValidatorPathResponseData, +) from infrahub.database import InfrahubDatabase # noqa: TCH001 from infrahub.events import EventMeta from infrahub.events.schema_action import SchemaUpdatedEvent @@ -307,13 +310,14 @@ async def load_schema( schema_branch=candidate_schema, constraints=result.constraints, ) - error_messages = await service.workflow.execute_workflow( + responses = await service.workflow.execute_workflow( workflow=SCHEMA_VALIDATE_MIGRATION, - expected_return=list[str], + expected_return=list[SchemaValidatorPathResponseData], parameters={"message": validate_migration_data}, ) + error_messages = [violation.message for response in responses for violation in response.violations] if error_messages: - raise SchemaNotValidError(message=",\n".join(error_messages)) + raise SchemaNotValidError(",\n".join(error_messages)) # ---------------------------------------------------------- # Update the schema @@ -402,11 +406,12 @@ async def check_schema( schema_branch=candidate_schema, constraints=result.constraints, ) - error_messages = await service.workflow.execute_workflow( + responses = await service.workflow.execute_workflow( workflow=SCHEMA_VALIDATE_MIGRATION, - expected_return=list[str], + expected_return=list[SchemaValidatorPathResponseData], parameters={"message": validate_migration_data}, ) + error_messages = [violation.message for response in responses for violation in response.violations] if error_messages: raise SchemaNotValidError(message=",\n".join(error_messages)) diff --git a/backend/infrahub/core/branch/tasks.py b/backend/infrahub/core/branch/tasks.py index e897d33b2f..6c21481de2 100644 --- a/backend/infrahub/core/branch/tasks.py +++ b/backend/infrahub/core/branch/tasks.py @@ -82,8 +82,8 @@ async def rebase_branch(branch: str) -> None: message=SchemaValidateMigrationData(branch=obj, schema_branch=candidate_schema, constraints=constraints) ) - if responses: - error_messages = [violation.message for response in responses for violation in response.violations] + error_messages = [violation.message for response in responses for violation in response.violations] + if error_messages: raise ValidationError(",\n".join(error_messages)) schema_in_main_before = merger.destination_schema.duplicate() diff --git a/backend/infrahub/core/validators/tasks.py b/backend/infrahub/core/validators/tasks.py index 81a935e238..fe32741af5 100644 --- a/backend/infrahub/core/validators/tasks.py +++ b/backend/infrahub/core/validators/tasks.py @@ -21,10 +21,7 @@ @flow(name="schema_validate_migrations", flow_run_name="Validate schema migrations") async def schema_validate_migrations(message: SchemaValidateMigrationData) -> list[SchemaValidatorPathResponseData]: batch = InfrahubBatch(return_exceptions=True) - violations: list[SchemaValidatorPathResponseData] = [] - log = get_run_logger() - await add_branch_tag(branch_name=message.branch.name) if not message.constraints: @@ -41,11 +38,8 @@ async def schema_validate_migrations(message: SchemaValidateMigrationData) -> li schema_path=constraint.path, ) - async for _, result in batch.execute(): - for violation in result.violations: - violations.append(violation) - - return violations + results = [result async for _, result in batch.execute()] + return results @task( diff --git a/backend/infrahub/graphql/mutations/tasks.py b/backend/infrahub/graphql/mutations/tasks.py index 946eb9118e..3a6292b053 100644 --- a/backend/infrahub/graphql/mutations/tasks.py +++ b/backend/infrahub/graphql/mutations/tasks.py @@ -60,8 +60,8 @@ async def merge_branch_mutation(branch: str) -> None: responses = await schema_validate_migrations( message=SchemaValidateMigrationData(branch=obj, schema_branch=candidate_schema, constraints=constraints) ) - if responses: - error_messages = [violation.message for response in responses for violation in response.violations] + error_messages = [violation.message for response in responses for violation in response.violations] + if error_messages: raise ValidationError(",\n".join(error_messages)) await service.workflow.execute_workflow(workflow=BRANCH_MERGE, parameters={"branch": obj.name}) diff --git a/backend/infrahub/message_bus/operations/requests/proposed_change.py b/backend/infrahub/message_bus/operations/requests/proposed_change.py index 178dad2208..6bbc25d113 100644 --- a/backend/infrahub/message_bus/operations/requests/proposed_change.py +++ b/backend/infrahub/message_bus/operations/requests/proposed_change.py @@ -9,9 +9,6 @@ from infrahub.core.constants import CheckType, InfrahubKind, RepositoryInternalStatus from infrahub.core.diff.coordinator import DiffCoordinator from infrahub.core.registry import registry -from infrahub.core.validators.determiner import ConstraintValidatorDeterminer -from infrahub.core.validators.models.validate_migration import SchemaValidateMigrationData -from infrahub.core.validators.tasks import schema_validate_migrations from infrahub.dependencies.registry import get_component_registry from infrahub.git.repository import InfrahubRepository from infrahub.log import get_logger @@ -37,14 +34,6 @@ REQUEST_PROPOSED_CHANGE_SCHEMA_INTEGRITY, REQUEST_PROPOSED_CHANGE_USER_TESTS, ) -from infrahub.workflows.catalogue import REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY - -if TYPE_CHECKING: - from infrahub_sdk.node import InfrahubNode - - from infrahub.core.models import SchemaUpdateConstraintInfo - from infrahub.core.schema.schema_branch import SchemaBranch - log = get_logger() @@ -199,118 +188,6 @@ async def pipeline(message: messages.RequestProposedChangePipeline, service: Inf name="proposed-changed-refresh-artifact", flow_run_name="Refreshing artifacts for change_proposal={message.proposed_change}", ) -@flow(name="proposed-changed-schema-integrity") -async def schema_integrity( - message: messages.RequestProposedChangeSchemaIntegrity, - service: InfrahubServices, # pylint: disable=unused-argument -) -> None: - async with service.task_report( - related_node=message.proposed_change, - title="Schema Integrity", - ): - log.info(f"Got a request to process schema integrity defined in proposed_change: {message.proposed_change}") - - # For now, we retrieve the latest schema for each branch from the registry - # In the future it would be good to generate the object SchemaUpdateValidationResult from message.branch_diff - source_schema = registry.schema.get_schema_branch(name=message.source_branch).duplicate() - dest_schema = registry.schema.get_schema_branch(name=message.destination_branch).duplicate() - - candidate_schema = dest_schema.duplicate() - candidate_schema.update(schema=source_schema) - validation_result = dest_schema.validate_update(other=candidate_schema) - - constraints_from_data_diff = await _get_proposed_change_schema_integrity_constraints( - message=message, schema=candidate_schema - ) - constraints_from_schema_diff = validation_result.constraints - constraints = set(constraints_from_data_diff + constraints_from_schema_diff) - - if not constraints: - return - - # ---------------------------------------------------------- - # Validate if the new schema is valid with the content of the database - # ---------------------------------------------------------- - source_branch = registry.get_branch_from_registry(branch=message.source_branch) - responses = await schema_validate_migrations( - message=SchemaValidateMigrationData( - branch=source_branch, schema_branch=candidate_schema, constraints=list(constraints) - ) - ) - - # TODO we need to report a failure if an error happened during the execution of a validator - conflicts: list[SchemaConflict] = [] - for response in responses: - for violation in response.violations: - conflicts.append( - SchemaConflict( - name=response.schema_path.get_path(), - type=response.constraint_name, - kind=violation.node_kind, - id=violation.node_id, - path=response.schema_path.get_path(), - value=violation.message, - branch="placeholder", - ) - ) - - if not conflicts: - return - - async with service.database.start_transaction() as db: - object_conflict_validator_recorder = ObjectConflictValidatorRecorder( - db=db, - validator_kind=InfrahubKind.SCHEMAVALIDATOR, - validator_label="Schema Integrity", - check_schema_kind=InfrahubKind.SCHEMACHECK, - ) - await object_conflict_validator_recorder.record_conflicts( - proposed_change_id=message.proposed_change, conflicts=conflicts - ) - - -@flow(name="proposed-changed-repository-check") -async def repository_checks(message: messages.RequestProposedChangeRepositoryChecks, service: InfrahubServices) -> None: - async with service.task_report( - related_node=message.proposed_change, - title=f"Evaluating Repository Checks {len(message.branch_diff.repositories)} repositories", - ) as task_report: - log.info(f"Got a request to process checks defined in proposed_change: {message.proposed_change}") - events: list[InfrahubMessage] = [] - for repository in message.branch_diff.repositories: - log_line = "Skipping merge conflict checks for data only branch" - if ( - message.source_branch_sync_with_git - and not repository.read_only - and repository.internal_status == RepositoryInternalStatus.ACTIVE.value - ): - events.append( - messages.RequestRepositoryChecks( - proposed_change=message.proposed_change, - repository=repository.repository_id, - source_branch=message.source_branch, - target_branch=message.destination_branch, - ) - ) - log_line = "Requesting merge conflict checks" - await task_report.info(f"{repository.repository_name}: {log_line}") - events.append( - messages.RequestRepositoryUserChecks( - proposed_change=message.proposed_change, - repository=repository.repository_id, - source_branch=message.source_branch, - source_branch_sync_with_git=message.source_branch_sync_with_git, - target_branch=message.destination_branch, - branch_diff=message.branch_diff, - ) - ) - await task_report.info(f"{repository.repository_name}: Requesting user checks") - for event in events: - event.assign_meta(parent=message) - await service.send(message=event) - - -@flow(name="proposed-changed-refresh-artifact") async def refresh_artifacts(message: messages.RequestProposedChangeRefreshArtifacts, service: InfrahubServices) -> None: definition_information = await service.client.execute_graphql( query=GATHER_ARTIFACT_DEFINITIONS, diff --git a/backend/infrahub/proposed_change/tasks.py b/backend/infrahub/proposed_change/tasks.py index 8b61cf15a9..fd2efb092c 100644 --- a/backend/infrahub/proposed_change/tasks.py +++ b/backend/infrahub/proposed_change/tasks.py @@ -26,8 +26,9 @@ from infrahub.core.integrity.object_conflict.conflict_recorder import ObjectConflictValidatorRecorder from infrahub.core.protocols import CoreDataCheck, CoreValidator from infrahub.core.protocols import CoreProposedChange as InternalCoreProposedChange -from infrahub.core.validators.checker import schema_validators_checker from infrahub.core.validators.determiner import ConstraintValidatorDeterminer +from infrahub.core.validators.models.validate_migration import SchemaValidateMigrationData +from infrahub.core.validators.tasks import schema_validate_migrations from infrahub.dependencies.registry import get_component_registry from infrahub.generators.models import ProposedChangeGeneratorDefinition from infrahub.git.repository import get_initialized_repo @@ -319,21 +320,23 @@ async def run_proposed_change_schema_integrity_check( # Validate if the new schema is valid with the content of the database # ---------------------------------------------------------- source_branch = registry.get_branch_from_registry(branch=model.source_branch) - _, responses = await schema_validators_checker( - branch=source_branch, schema=candidate_schema, constraints=list(constraints), service=service + responses = await schema_validate_migrations( + message=SchemaValidateMigrationData( + branch=source_branch, schema_branch=candidate_schema, constraints=list(constraints) + ) ) # TODO we need to report a failure if an error happened during the execution of a validator conflicts: list[SchemaConflict] = [] for response in responses: - for violation in response.data.violations: + for violation in response.violations: conflicts.append( SchemaConflict( - name=response.data.schema_path.get_path(), - type=response.data.constraint_name, + name=response.schema_path.get_path(), + type=response.constraint_name, kind=violation.node_kind, id=violation.node_id, - path=response.data.schema_path.get_path(), + path=response.schema_path.get_path(), value=violation.message, branch="placeholder", ) diff --git a/backend/tests/integration/message_bus/operations/request/test_proposed_change.py b/backend/tests/integration/message_bus/operations/request/test_proposed_change.py index b8093094f8..4ed148a567 100644 --- a/backend/tests/integration/message_bus/operations/request/test_proposed_change.py +++ b/backend/tests/integration/message_bus/operations/request/test_proposed_change.py @@ -159,7 +159,6 @@ async def test_run_pipeline_validate_requested_jobs( assert sorted(bus_post_data_changes.seen_routing_keys) == [ "request.proposed_change.refresh_artifacts", "request.repository.user_checks", - "schema.validator.path", ] diff --git a/backend/tests/unit/api/test_40_schema_api.py b/backend/tests/unit/api/test_40_schema_api.py index 05c0fd657e..12ad155e23 100644 --- a/backend/tests/unit/api/test_40_schema_api.py +++ b/backend/tests/unit/api/test_40_schema_api.py @@ -6,7 +6,6 @@ from infrahub.core.initialization import create_branch from infrahub.core.node import Node from infrahub.core.schema import SchemaRoot, core_models -from infrahub.core.utils import count_relationships from infrahub.database import InfrahubDatabase @@ -231,172 +230,6 @@ async def test_schema_load_restricted_namespace( assert response.json()["errors"][0]["message"] == "Restricted namespace 'Internal' used on 'Timestamp'" -async def test_schema_load_endpoint_idempotent_simple( - db: InfrahubDatabase, - client: TestClient, - admin_headers, - default_branch: Branch, - prefect_test_fixture, - workflow_local, - register_core_schema_db, - authentication_base, - helper, -): - # Load the schema in the database - schema = registry.schema.get_schema_branch(name=default_branch.name) - await registry.schema.load_schema_to_db(schema=schema, branch=default_branch, db=db) - - # Must execute in a with block to execute the startup/shutdown events - with client: - creation = client.post( - "/api/schema/load", headers=admin_headers, json={"schemas": [helper.schema_file("infra_simple_01.json")]} - ) - read = client.get("/api/schema", headers=admin_headers) - - nbr_rels = await count_relationships(db=db) - - assert creation.status_code == 200 - assert read.status_code == 200 - nodes = read.json()["nodes"] - device = [node for node in nodes if node["name"] == "Device"] - assert device - device = device[0] - attributes = {attrib["name"]: attrib["order_weight"] for attrib in device["attributes"]} - relationships = {attrib["name"]: attrib["order_weight"] for attrib in device["relationships"]} - assert attributes["name"] == 1000 - assert attributes["description"] == 900 - assert attributes["type"] == 3000 - assert relationships["interfaces"] == 450 - assert relationships["tags"] == 7000 - - creation = client.post( - "/api/schema/load", headers=admin_headers, json={"schemas": [helper.schema_file("infra_simple_01.json")]} - ) - read = client.get("/api/schema", headers=admin_headers) - - assert creation.status_code == 200 - assert read.status_code == 200 - - assert nbr_rels == await count_relationships(db=db) - - -async def test_schema_load_endpoint_valid_with_generics( - db: InfrahubDatabase, - client: TestClient, - admin_headers, - default_branch: Branch, - prefect_test_fixture, - workflow_local, - register_core_schema_db, - authentication_base, - helper, -): - # Load the schema in the database - schema = registry.schema.get_schema_branch(name=default_branch.name) - await registry.schema.load_schema_to_db(schema=schema, branch=default_branch, db=db) - - # Must execute in a with block to execute the startup/shutdown events - with client: - response1 = client.post( - "/api/schema/load", - headers=admin_headers, - json={"schemas": [helper.schema_file("infra_w_generics_01.json")]}, - ) - assert response1.status_code == 200 - - response2 = client.get("/api/schema", headers=admin_headers) - assert response2.status_code == 200 - - schema = response2.json() - assert len(schema["generics"]) == len(core_models.get("generics")) + 1 - - -async def test_schema_load_endpoint_idempotent_with_generics( - db: InfrahubDatabase, - client: TestClient, - admin_headers, - default_branch: Branch, - prefect_test_fixture, - workflow_local, - register_core_schema_db, - authentication_base, - helper, -): - # Load the schema in the database - schema = registry.schema.get_schema_branch(name=default_branch.name) - await registry.schema.load_schema_to_db(schema=schema, branch=default_branch, db=db) - - # Must execute in a with block to execute the startup/shutdown events - with client: - response1 = client.post( - "/api/schema/load", - headers=admin_headers, - json={"schemas": [helper.schema_file("infra_w_generics_01.json")]}, - ) - assert response1.json()["schema_updated"] - assert response1.status_code == 200 - - response2 = client.get("/api/schema", headers=admin_headers) - assert response2.status_code == 200 - - schema = response2.json() - assert len(schema["generics"]) == len(core_models.get("generics")) + 1 - - nbr_rels = await count_relationships(db=db) - - response3 = client.post( - "/api/schema/load", - headers=admin_headers, - json={"schemas": [helper.schema_file("infra_w_generics_01.json")]}, - ) - assert response3.json()["schema_updated"] is False - assert response3.status_code == 200 - - response4 = client.get("/api/schema", headers=admin_headers) - assert response4.status_code == 200 - - nbr_rels_after = await count_relationships(db=db) - assert nbr_rels == nbr_rels_after - - -async def test_schema_load_endpoint_valid_with_extensions( - db: InfrahubDatabase, - client: TestClient, - admin_headers, - rpc_bus, - default_branch: Branch, - prefect_test_fixture, - workflow_local, - authentication_base, - helper, -): - # Load the schema in the database - schema = registry.schema.get_schema_branch(name=default_branch.name) - await registry.schema.load_schema_to_db(schema=schema, branch=default_branch, db=db) - - org_schema = registry.schema.get(name="CoreOrganization", branch=default_branch.name) - initial_nbr_relationships = len(org_schema.relationships) - - schema = registry.schema.get_schema_branch(name=default_branch.name) - await registry.schema.load_schema_to_db( - db=db, schema=schema, branch=default_branch, limit=["CoreOrganization", "InfraSite"] - ) - - # Must execute in a with block to execute the startup/shutdown events - with client: - response = client.post( - "/api/schema/load", - headers=admin_headers, - json={"schemas": [helper.schema_file("infra_w_extensions_01.json")]}, - ) - - assert response.json()["schema_updated"] - assert response.status_code == 200 - - org_schema = registry.schema.get(name="CoreOrganization", branch=default_branch.name) - assert len(org_schema.relationships) == initial_nbr_relationships + 1 - - async def test_schema_load_endpoint_not_valid_simple_02( db: InfrahubDatabase, client: TestClient, diff --git a/backend/tests/unit/core/constraint_validators/test_task_schema_validate_migrations.py b/backend/tests/unit/core/constraint_validators/test_task_schema_validate_migrations.py index 260e349cae..a2a34febd1 100644 --- a/backend/tests/unit/core/constraint_validators/test_task_schema_validate_migrations.py +++ b/backend/tests/unit/core/constraint_validators/test_task_schema_validate_migrations.py @@ -41,9 +41,10 @@ async def test_schema_validate_migrations( ) message = SchemaValidateMigrationData(branch=default_branch, schema_branch=schema, constraints=constraints) - errors = await schema_validate_migrations( + responses = await schema_validate_migrations( message=message, ) - assert len(errors) == 1 - assert "is not compatible with the constraint 'attribute.regex.update'" in errors[0] + assert len(responses) == 1 + assert len(responses[0].violations) == 1 + assert "is not compatible with the constraint 'attribute.regex.update'" in responses[0].violations[0].message