Skip to content

Commit

Permalink
SCKAN-275 feat: Update ingestion script
Browse files Browse the repository at this point in the history
  • Loading branch information
afonsobspinto committed Mar 19, 2024
1 parent 776237e commit 253d241
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 36 deletions.
8 changes: 7 additions & 1 deletion backend/composer/management/commands/ingest_statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,19 @@ def add_arguments(self, parser):
action='store_true',
help='Set this flag to update upstream statements.',
)
parser.add_argument(
'--update_anatomic_entities',
action='store_true',
help='Set this flag to try move anatomical entities to specific layer, region.',
)

def handle(self, *args, **options):
update_upstream = options['update_upstream']
update_anatomic_entities = options['update_anatomic_entities']

start_time = time.time()

ingest_statements(update_upstream)
ingest_statements(update_upstream, update_anatomic_entities)

end_time = time.time()

Expand Down
119 changes: 84 additions & 35 deletions backend/composer/services/cs_ingestion/cs_ingestion_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
from datetime import datetime
from typing import List, Dict, Optional, Tuple, Set, Any

from django.db import transaction
from django.db import transaction, IntegrityError
from neurondm import orders

from composer.models import AnatomicalEntity, Sentence, ConnectivityStatement, Sex, FunctionalCircuitRole, \
ProjectionPhenotype, Phenotype, Specie, Provenance, Via, Note, User, Destination, Region, Layer
ProjectionPhenotype, Phenotype, Specie, Provenance, Via, Note, User, Destination, Region, Layer, \
AnatomicalEntityIntersection, AnatomicalEntityMeta
from .exceptions import EntityNotFoundException
from .helpers import get_value_or_none, found_entity, \
ORIGINS, DESTINATIONS, VIAS, LABEL, SEX, SPECIES, ID, FORWARD_CONNECTION, SENTENCE_NUMBER, \
Expand All @@ -25,7 +26,7 @@
logger_service = LoggerService()


def ingest_statements(update_upstream=False):
def ingest_statements(update_upstream=False, update_anatomic_entities=False):
statements_list = get_statements_from_neurondm(logger_service_param=logger_service)
overridable_statements = get_overwritable_statements(statements_list)
statements = validate_statements(overridable_statements)
Expand All @@ -35,8 +36,8 @@ def ingest_statements(update_upstream=False):
with transaction.atomic():
for statement in statements:
sentence, _ = get_or_create_sentence(statement)
create_or_update_connectivity_statement(statement, sentence)

create_or_update_connectivity_statement(statement, sentence, update_anatomic_entities)
# TODO: Errors in one statement should stop the ingestion?
update_forward_connections(statements)
except Exception as e:
logger_service.add_anomaly(
Expand Down Expand Up @@ -109,6 +110,8 @@ def validate_statements(statements: List[Dict[str, Any]]) -> List[Dict[str, Any]
# Validate forward connection
annotate_invalid_forward_connections(statement, statement_ids)

# TODO: Validate anatomical entities

return statements


Expand Down Expand Up @@ -193,7 +196,8 @@ def get_or_create_sentence(statement: Dict) -> Tuple[Sentence, bool]:
return sentence, created


def create_or_update_connectivity_statement(statement: Dict, sentence: Sentence) -> Tuple[ConnectivityStatement, bool]:
def create_or_update_connectivity_statement(statement: Dict, sentence: Sentence, update_anatomic_entities: bool) -> \
Tuple[ConnectivityStatement, bool]:
reference_uri = statement[ID]
defaults = {
"sentence": sentence,
Expand Down Expand Up @@ -227,7 +231,7 @@ def create_or_update_connectivity_statement(statement: Dict, sentence: Sentence)
else:
create_invalid_note(connectivity_statement, error_message)

update_many_to_many_fields(connectivity_statement, statement)
update_many_to_many_fields(connectivity_statement, statement, update_anatomic_entities)
statement[STATE] = connectivity_statement.state

return connectivity_statement, created
Expand Down Expand Up @@ -401,7 +405,8 @@ def create_invalid_note(connectivity_statement: ConnectivityStatement, note: str
)


def update_many_to_many_fields(connectivity_statement: ConnectivityStatement, statement: Dict):
def update_many_to_many_fields(connectivity_statement: ConnectivityStatement, statement: Dict,
update_anatomic_entities: bool):
connectivity_statement.origins.clear()
connectivity_statement.species.clear()
# Notes are not cleared because they should be kept
Expand All @@ -415,76 +420,119 @@ def update_many_to_many_fields(connectivity_statement: ConnectivityStatement, st
for via in connectivity_statement.via_set.all():
via.delete()

add_origins(connectivity_statement, statement)
add_vias(connectivity_statement, statement)
add_destinations(connectivity_statement, statement)
add_origins(connectivity_statement, statement, update_anatomic_entities)
add_vias(connectivity_statement, statement, update_anatomic_entities)
add_destinations(connectivity_statement, statement, update_anatomic_entities)
add_species(connectivity_statement, statement)
add_provenances(connectivity_statement, statement)
add_notes(connectivity_statement, statement)


def add_origins(connectivity_statement: ConnectivityStatement, statement: Dict):
def add_origins(connectivity_statement: ConnectivityStatement, statement: Dict, update_anatomic_entities: bool):
for entity in statement[ORIGINS].anatomical_entities:
try:
add_entity_to_instance(connectivity_statement, 'origins', entity)
add_entity_to_instance(connectivity_statement, 'origins', entity, update_anatomic_entities)
except (EntityNotFoundException, AnatomicalEntity.DoesNotExist):
assert connectivity_statement.state == CSState.INVALID
assert connectivity_statement.state == CSState.INVALID, f"connectivity_statement {connectivity_statement} should be invalid due to entity {entity} not found but it isn't"
except IntegrityError as e:
raise e


def add_vias(connectivity_statement: ConnectivityStatement, statement: Dict):
def add_vias(connectivity_statement: ConnectivityStatement, statement: Dict, update_anatomic_entities: bool):
for neurondm_via in statement[VIAS]:
via_instance = Via.objects.create(connectivity_statement=connectivity_statement,
type=neurondm_via.type,
order=neurondm_via.order)
add_entities_to_connection(via_instance,
neurondm_via.anatomical_entities,
neurondm_via.from_entities,
connectivity_statement)
connectivity_statement, update_anatomic_entities)


def add_destinations(connectivity_statement: ConnectivityStatement, statement: Dict):
def add_destinations(connectivity_statement: ConnectivityStatement, statement: Dict, update_anatomic_entities: bool):
for neurondm_destination in statement[DESTINATIONS]:
destination_instance = Destination.objects.create(connectivity_statement=connectivity_statement,
type=neurondm_destination.type)
add_entities_to_connection(destination_instance,
neurondm_destination.anatomical_entities,
neurondm_destination.from_entities,
connectivity_statement)
connectivity_statement, update_anatomic_entities)


def add_entities_to_connection(instance, anatomical_entities, from_entities, connectivity_statement):
def add_entities_to_connection(instance, anatomical_entities, from_entities, connectivity_statement,
update_anatomic_entities: bool):
try:
for entity in anatomical_entities:
add_entity_to_instance(instance, 'anatomical_entities', entity)
add_entity_to_instance(instance, 'anatomical_entities', entity, update_anatomic_entities)

for entity in from_entities:
add_entity_to_instance(instance, 'from_entities', entity)
add_entity_to_instance(instance, 'from_entities', entity, update_anatomic_entities)

except (EntityNotFoundException, AnatomicalEntity.DoesNotExist):
assert connectivity_statement.state == CSState.INVALID
assert connectivity_statement.state == CSState.INVALID, \
f"connectivity_statement {connectivity_statement} should be invalid due to entity {entity} not found but it isn't"
except IntegrityError as e:
raise e


def add_entity_to_instance(instance, entity_field, entity):
def add_entity_to_instance(instance, entity_field, entity, update_anatomic_entities: bool):
if isinstance(entity, orders.rl):
region, _ = get_or_create_complex_entity(entity.region, entity.layer)
getattr(instance, entity_field).add(region)
complex_anatomical_entity, _ = get_or_create_complex_entity(str(entity.region), str(entity.layer),
update_anatomic_entities)
getattr(instance, entity_field).add(complex_anatomical_entity)
else:
anatomical_entity = AnatomicalEntity.objects.filter(ontology_uri=str(entity)).first()
getattr(instance, entity_field).add(anatomical_entity)


def get_or_create_complex_entity(region_uri, layer_uri):
region_entity = AnatomicalEntity.objects.filter(ontology_uri=region_uri).first()
layer_entity = AnatomicalEntity.objects.filter(ontology_uri=layer_uri).first()
def get_or_create_complex_entity(region_uri, layer_uri, update_anatomical_entities=False):
layer = Layer.objects.filter(ontology_uri=layer_uri).first()
region = Region.objects.filter(ontology_uri=region_uri).first()

if update_anatomical_entities:
if not layer:
layer_meta = AnatomicalEntityMeta.objects.filter(ontology_uri=layer_uri).first()
if layer_meta:
layer, _ = convert_anatomical_entity_to_specific_type(layer_meta, Layer)
else:
raise EntityNotFoundException(f"Layer not found for URI: {layer_uri}")

if not region:
region_meta = AnatomicalEntityMeta.objects.filter(ontology_uri=region_uri).first()
if region_meta:
region, _ = convert_anatomical_entity_to_specific_type(region_meta, Region)
else:
raise EntityNotFoundException(f"Region not found for URI: {region_uri}")
else:
if not layer or not region:
raise EntityNotFoundException("Required Layer or Region not found and update not permitted.")

intersection, _ = AnatomicalEntityIntersection.objects.get_or_create(layer=layer, region=region)
anatomical_entity, created = AnatomicalEntity.objects.get_or_create(region_layer=intersection)

if not region_entity or not layer_entity:
raise EntityNotFoundException(f"Region or layer not found for URIs: {region_uri}, {layer_uri}")
return anatomical_entity, created


layer, _ = Layer.objects.get_or_create(ontology_uri=layer_uri, defaults={'name': layer_entity.name})
region, _ = Region.objects.get_or_create(ontology_uri=region_uri,
defaults={'name': region_entity.name, 'associated_layer': layer})
return region, layer
def convert_anatomical_entity_to_specific_type(entity_meta, target_model):
"""
Convert an AnatomicalEntityMeta instance to a more specific subclass type (Layer or Region).
Attempts to delete the original instance and create the new specific instance atomically.
"""
defaults = {'name': entity_meta.name, 'ontology_uri': entity_meta.ontology_uri}

try:
with transaction.atomic():
# Delete the anatomical entity in the incorrect type
entity_meta.delete()
# Create a new anatomical entity in the new specific type
specific_entity, created = target_model.objects.get_or_create(
ontology_uri=entity_meta.ontology_uri,
defaults=defaults
)
return specific_entity, created
except IntegrityError as e:
raise IntegrityError(
f"Failed to convert AnatomicalEntityMeta to {target_model.__name__} due to integrity error: {e}")


def add_notes(connectivity_statement: ConnectivityStatement, statement: Dict):
Expand Down Expand Up @@ -522,7 +570,8 @@ def update_forward_connections(statements: List):
try:
forward_statement = ConnectivityStatement.objects.get(reference_uri=uri)
except ConnectivityStatement.DoesNotExist:
assert connectivity_statement.state == CSState.INVALID
assert connectivity_statement.state == CSState.INVALID, \
f"connectivity_statement {connectivity_statement} should be invalid due to forward connection {uri} not found but it isn't"
continue
connectivity_statement.forward_connection.add(forward_statement)

Expand Down

0 comments on commit 253d241

Please sign in to comment.