Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
LucasG0 committed Dec 2, 2024
1 parent 6e71ef5 commit 1450381
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 319 deletions.
17 changes: 11 additions & 6 deletions backend/infrahub/api/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
)
from infrahub.core.schema import GenericSchema, MainSchemaTypes, NodeSchema, ProfileSchema, SchemaRoot
from infrahub.core.schema.constants import SchemaNamespace # noqa: TCH001
from infrahub.core.validators.models.validate_migration import SchemaValidateMigrationData
from infrahub.core.validators.models.validate_migration import (
SchemaValidateMigrationData,
SchemaValidatorPathResponseData,
)
from infrahub.database import InfrahubDatabase # noqa: TCH001
from infrahub.events import EventMeta
from infrahub.events.schema_action import SchemaUpdatedEvent
Expand Down Expand Up @@ -307,13 +310,14 @@ async def load_schema(
schema_branch=candidate_schema,
constraints=result.constraints,
)
error_messages = await service.workflow.execute_workflow(
responses = await service.workflow.execute_workflow(
workflow=SCHEMA_VALIDATE_MIGRATION,
expected_return=list[str],
expected_return=list[SchemaValidatorPathResponseData],
parameters={"message": validate_migration_data},
)
error_messages = [violation.message for response in responses for violation in response.violations]
if error_messages:
raise SchemaNotValidError(message=",\n".join(error_messages))
raise SchemaNotValidError(",\n".join(error_messages))

# ----------------------------------------------------------
# Update the schema
Expand Down Expand Up @@ -402,11 +406,12 @@ async def check_schema(
schema_branch=candidate_schema,
constraints=result.constraints,
)
error_messages = await service.workflow.execute_workflow(
responses = await service.workflow.execute_workflow(
workflow=SCHEMA_VALIDATE_MIGRATION,
expected_return=list[str],
expected_return=list[SchemaValidatorPathResponseData],
parameters={"message": validate_migration_data},
)
error_messages = [violation.message for response in responses for violation in response.violations]
if error_messages:
raise SchemaNotValidError(message=",\n".join(error_messages))

Expand Down
4 changes: 2 additions & 2 deletions backend/infrahub/core/branch/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ async def rebase_branch(branch: str) -> None:
message=SchemaValidateMigrationData(branch=obj, schema_branch=candidate_schema, constraints=constraints)
)

if responses:
error_messages = [violation.message for response in responses for violation in response.violations]
error_messages = [violation.message for response in responses for violation in response.violations]
if error_messages:
raise ValidationError(",\n".join(error_messages))

schema_in_main_before = merger.destination_schema.duplicate()
Expand Down
10 changes: 2 additions & 8 deletions backend/infrahub/core/validators/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
@flow(name="schema_validate_migrations", flow_run_name="Validate schema migrations")
async def schema_validate_migrations(message: SchemaValidateMigrationData) -> list[SchemaValidatorPathResponseData]:
batch = InfrahubBatch(return_exceptions=True)
violations: list[SchemaValidatorPathResponseData] = []

log = get_run_logger()

await add_branch_tag(branch_name=message.branch.name)

if not message.constraints:
Expand All @@ -41,11 +38,8 @@ async def schema_validate_migrations(message: SchemaValidateMigrationData) -> li
schema_path=constraint.path,
)

async for _, result in batch.execute():
for violation in result.violations:
violations.append(violation)

return violations
results = [result async for _, result in batch.execute()]
return results


@task(
Expand Down
4 changes: 2 additions & 2 deletions backend/infrahub/graphql/mutations/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ async def merge_branch_mutation(branch: str) -> None:
responses = await schema_validate_migrations(
message=SchemaValidateMigrationData(branch=obj, schema_branch=candidate_schema, constraints=constraints)
)
if responses:
error_messages = [violation.message for response in responses for violation in response.violations]
error_messages = [violation.message for response in responses for violation in response.violations]
if error_messages:
raise ValidationError(",\n".join(error_messages))

await service.workflow.execute_workflow(workflow=BRANCH_MERGE, parameters={"branch": obj.name})
123 changes: 0 additions & 123 deletions backend/infrahub/message_bus/operations/requests/proposed_change.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
from infrahub.core.constants import CheckType, InfrahubKind, RepositoryInternalStatus
from infrahub.core.diff.coordinator import DiffCoordinator
from infrahub.core.registry import registry
from infrahub.core.validators.determiner import ConstraintValidatorDeterminer
from infrahub.core.validators.models.validate_migration import SchemaValidateMigrationData
from infrahub.core.validators.tasks import schema_validate_migrations
from infrahub.dependencies.registry import get_component_registry
from infrahub.git.repository import InfrahubRepository
from infrahub.log import get_logger
Expand All @@ -37,14 +34,6 @@
REQUEST_PROPOSED_CHANGE_SCHEMA_INTEGRITY,
REQUEST_PROPOSED_CHANGE_USER_TESTS,
)
from infrahub.workflows.catalogue import REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY

if TYPE_CHECKING:
from infrahub_sdk.node import InfrahubNode

from infrahub.core.models import SchemaUpdateConstraintInfo
from infrahub.core.schema.schema_branch import SchemaBranch


log = get_logger()

Expand Down Expand Up @@ -199,118 +188,6 @@ async def pipeline(message: messages.RequestProposedChangePipeline, service: Inf
name="proposed-changed-refresh-artifact",
flow_run_name="Refreshing artifacts for change_proposal={message.proposed_change}",
)
@flow(name="proposed-changed-schema-integrity")
async def schema_integrity(
message: messages.RequestProposedChangeSchemaIntegrity,
service: InfrahubServices, # pylint: disable=unused-argument
) -> None:
async with service.task_report(
related_node=message.proposed_change,
title="Schema Integrity",
):
log.info(f"Got a request to process schema integrity defined in proposed_change: {message.proposed_change}")

# For now, we retrieve the latest schema for each branch from the registry
# In the future it would be good to generate the object SchemaUpdateValidationResult from message.branch_diff
source_schema = registry.schema.get_schema_branch(name=message.source_branch).duplicate()
dest_schema = registry.schema.get_schema_branch(name=message.destination_branch).duplicate()

candidate_schema = dest_schema.duplicate()
candidate_schema.update(schema=source_schema)
validation_result = dest_schema.validate_update(other=candidate_schema)

constraints_from_data_diff = await _get_proposed_change_schema_integrity_constraints(
message=message, schema=candidate_schema
)
constraints_from_schema_diff = validation_result.constraints
constraints = set(constraints_from_data_diff + constraints_from_schema_diff)

if not constraints:
return

# ----------------------------------------------------------
# Validate if the new schema is valid with the content of the database
# ----------------------------------------------------------
source_branch = registry.get_branch_from_registry(branch=message.source_branch)
responses = await schema_validate_migrations(
message=SchemaValidateMigrationData(
branch=source_branch, schema_branch=candidate_schema, constraints=list(constraints)
)
)

# TODO we need to report a failure if an error happened during the execution of a validator
conflicts: list[SchemaConflict] = []
for response in responses:
for violation in response.violations:
conflicts.append(
SchemaConflict(
name=response.schema_path.get_path(),
type=response.constraint_name,
kind=violation.node_kind,
id=violation.node_id,
path=response.schema_path.get_path(),
value=violation.message,
branch="placeholder",
)
)

if not conflicts:
return

async with service.database.start_transaction() as db:
object_conflict_validator_recorder = ObjectConflictValidatorRecorder(
db=db,
validator_kind=InfrahubKind.SCHEMAVALIDATOR,
validator_label="Schema Integrity",
check_schema_kind=InfrahubKind.SCHEMACHECK,
)
await object_conflict_validator_recorder.record_conflicts(
proposed_change_id=message.proposed_change, conflicts=conflicts
)


@flow(name="proposed-changed-repository-check")
async def repository_checks(message: messages.RequestProposedChangeRepositoryChecks, service: InfrahubServices) -> None:
async with service.task_report(
related_node=message.proposed_change,
title=f"Evaluating Repository Checks {len(message.branch_diff.repositories)} repositories",
) as task_report:
log.info(f"Got a request to process checks defined in proposed_change: {message.proposed_change}")
events: list[InfrahubMessage] = []
for repository in message.branch_diff.repositories:
log_line = "Skipping merge conflict checks for data only branch"
if (
message.source_branch_sync_with_git
and not repository.read_only
and repository.internal_status == RepositoryInternalStatus.ACTIVE.value
):
events.append(
messages.RequestRepositoryChecks(
proposed_change=message.proposed_change,
repository=repository.repository_id,
source_branch=message.source_branch,
target_branch=message.destination_branch,
)
)
log_line = "Requesting merge conflict checks"
await task_report.info(f"{repository.repository_name}: {log_line}")
events.append(
messages.RequestRepositoryUserChecks(
proposed_change=message.proposed_change,
repository=repository.repository_id,
source_branch=message.source_branch,
source_branch_sync_with_git=message.source_branch_sync_with_git,
target_branch=message.destination_branch,
branch_diff=message.branch_diff,
)
)
await task_report.info(f"{repository.repository_name}: Requesting user checks")
for event in events:
event.assign_meta(parent=message)
await service.send(message=event)


@flow(name="proposed-changed-refresh-artifact")
async def refresh_artifacts(message: messages.RequestProposedChangeRefreshArtifacts, service: InfrahubServices) -> None:
definition_information = await service.client.execute_graphql(
query=GATHER_ARTIFACT_DEFINITIONS,
Expand Down
17 changes: 10 additions & 7 deletions backend/infrahub/proposed_change/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
from infrahub.core.integrity.object_conflict.conflict_recorder import ObjectConflictValidatorRecorder
from infrahub.core.protocols import CoreDataCheck, CoreValidator
from infrahub.core.protocols import CoreProposedChange as InternalCoreProposedChange
from infrahub.core.validators.checker import schema_validators_checker
from infrahub.core.validators.determiner import ConstraintValidatorDeterminer
from infrahub.core.validators.models.validate_migration import SchemaValidateMigrationData
from infrahub.core.validators.tasks import schema_validate_migrations
from infrahub.dependencies.registry import get_component_registry
from infrahub.generators.models import ProposedChangeGeneratorDefinition
from infrahub.git.repository import get_initialized_repo
Expand Down Expand Up @@ -319,21 +320,23 @@ async def run_proposed_change_schema_integrity_check(
# Validate if the new schema is valid with the content of the database
# ----------------------------------------------------------
source_branch = registry.get_branch_from_registry(branch=model.source_branch)
_, responses = await schema_validators_checker(
branch=source_branch, schema=candidate_schema, constraints=list(constraints), service=service
responses = await schema_validate_migrations(
message=SchemaValidateMigrationData(
branch=source_branch, schema_branch=candidate_schema, constraints=list(constraints)
)
)

# TODO we need to report a failure if an error happened during the execution of a validator
conflicts: list[SchemaConflict] = []
for response in responses:
for violation in response.data.violations:
for violation in response.violations:
conflicts.append(
SchemaConflict(
name=response.data.schema_path.get_path(),
type=response.data.constraint_name,
name=response.schema_path.get_path(),
type=response.constraint_name,
kind=violation.node_kind,
id=violation.node_id,
path=response.data.schema_path.get_path(),
path=response.schema_path.get_path(),
value=violation.message,
branch="placeholder",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ async def test_run_pipeline_validate_requested_jobs(
assert sorted(bus_post_data_changes.seen_routing_keys) == [
"request.proposed_change.refresh_artifacts",
"request.repository.user_checks",
"schema.validator.path",
]


Expand Down
Loading

0 comments on commit 1450381

Please sign in to comment.