From 0dc9c1d1be732bfdc0bfe999d63c8447615d668b Mon Sep 17 00:00:00 2001 From: Delsin Van Grembergen Date: Tue, 5 Nov 2024 14:49:10 +0100 Subject: [PATCH 1/3] feat: Add top level fields mapping and external_file_sources --- src/elody/csv.py | 56 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/src/elody/csv.py b/src/elody/csv.py index 3c6f272..c1de3f8 100644 --- a/src/elody/csv.py +++ b/src/elody/csv.py @@ -13,7 +13,7 @@ class CSVParser: - top_level_fields = ["type", "filename"] + top_level_fields = ["type", "filename", "file_identifier"] identifier_fields = ["identifiers", "identifier", "object_id", "entity_id"] schema_mapping = { "entity": entity_schema, @@ -125,6 +125,8 @@ def __init__( required_metadata_values=None, metadata_field_mapping=None, include_indexed_field=False, + top_level_fields_mapping=None, + external_file_sources=None, ): super().__init__(csvstring) self.index_mapping = index_mapping if index_mapping else dict() @@ -140,7 +142,14 @@ def __init__( self.objects = dict() self.errors = dict() self.include_indexed_field = include_indexed_field + self.top_level_fields_mapping = ( + top_level_fields_mapping if top_level_fields_mapping else dict() + ) + self.external_file_sources = ( + external_file_sources if external_file_sources else [] + ) self.__fill_objects_from_csv() + self.__rename_top_level_fields() def get_entities(self): return self.objects.get("entities", list()) @@ -148,6 +157,9 @@ def get_entities(self): def get_errors(self): return self.errors + def get_top_level_fields_mapping(self, type): + return self.top_level_fields_mapping.get(type, {}) + def get_mediafiles(self): return self.objects.get("mediafiles", list()) @@ -163,7 +175,9 @@ def __field_allowed(self, target_object_type, key, value): def __fill_objects_from_csv(self): indexed_dict = dict() + external_mediafiles_ids = [] for row in self.reader: + external_mediafiles = False if not all(x in row.keys() for x in self.index_mapping.values()): raise ColumnNotFoundException( f"Not all identifying columns are present in CSV" @@ -179,7 +193,26 @@ def __fill_objects_from_csv(self): if previous_id: indexed_dict[type][id]["matching_id"] = previous_id previous_id = id + file_source = None for key, value in row.items(): + if key == "file_source": + file_source = value + if ( + key == "file_identifier" + and file_source in self.external_file_sources + ): + if ( + indexed_dict[type][id]["matching_id"] + not in external_mediafiles_ids + ): + external_mediafiles_ids.append( + indexed_dict[type][id]["matching_id"] + ) + external_mediafiles = True + if "entities" not in indexed_dict: + indexed_dict["entities"] = dict() + if id in indexed_dict["entities"]: + indexed_dict["entities"][id]["file_identifier"] = value if self._is_relation_field(key) and self.__field_allowed( type, key, value ): @@ -219,6 +252,12 @@ def __fill_objects_from_csv(self): self.__add_required_fields(indexed_dict) for object_type, objects in indexed_dict.items(): self.objects[object_type] = list(objects.values()) + if external_mediafiles: + self.objects["mediafiles"] = [ + mediafile + for mediafile in self.objects["mediafiles"] + if mediafile["matching_id"] not in external_mediafiles_ids + ] def __add_required_fields(self, indexed_dict): if not self.required_metadata_values: @@ -253,3 +292,18 @@ def __validate_indexed_dict(self, indexed_dict): ) for error_id in error_ids: del objects[error_id] + + def __rename_top_level_fields(self): + def rename_fields(items, mapping): + for item in items: + for old_key, new_key in mapping.items(): + if old_key in item: + item[new_key] = item.pop(old_key) + + mediafiles = self.get_mediafiles() + entities = self.get_entities() + mediafiles_mapping = self.get_top_level_fields_mapping("mediafiles") + entities_mapping = self.get_top_level_fields_mapping("entities") + + rename_fields(mediafiles, mediafiles_mapping) + rename_fields(entities, entities_mapping) From ae606715df122e97987e914791fb2b91c20238c3 Mon Sep 17 00:00:00 2001 From: Delsin Van Grembergen Date: Tue, 5 Nov 2024 14:49:51 +0100 Subject: [PATCH 2/3] fix: remove filename requirement since we will use file_identifier in the csv. --- src/elody/schemas.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/elody/schemas.py b/src/elody/schemas.py index 9e0b5f6..c149b36 100644 --- a/src/elody/schemas.py +++ b/src/elody/schemas.py @@ -105,9 +105,6 @@ "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "default": {}, - "required": [ - "filename", - ], "properties": { "filename": { "type": "string", From 9c88ac47a6495e39a367627b4f86deea7eb37abe Mon Sep 17 00:00:00 2001 From: Delsin Van Grembergen Date: Wed, 13 Nov 2024 13:16:51 +0100 Subject: [PATCH 3/3] Make it more generic to define other file_sources --- src/elody/csv.py | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/src/elody/csv.py b/src/elody/csv.py index c1de3f8..cfdd145 100644 --- a/src/elody/csv.py +++ b/src/elody/csv.py @@ -177,7 +177,6 @@ def __fill_objects_from_csv(self): indexed_dict = dict() external_mediafiles_ids = [] for row in self.reader: - external_mediafiles = False if not all(x in row.keys() for x in self.index_mapping.values()): raise ColumnNotFoundException( f"Not all identifying columns are present in CSV" @@ -201,14 +200,9 @@ def __fill_objects_from_csv(self): key == "file_identifier" and file_source in self.external_file_sources ): - if ( - indexed_dict[type][id]["matching_id"] - not in external_mediafiles_ids - ): - external_mediafiles_ids.append( - indexed_dict[type][id]["matching_id"] - ) - external_mediafiles = True + matching_id = indexed_dict[type][id]["matching_id"] + if not any(matching_id in id for id in external_mediafiles_ids): + external_mediafiles_ids.append({matching_id: file_source}) if "entities" not in indexed_dict: indexed_dict["entities"] = dict() if id in indexed_dict["entities"]: @@ -252,12 +246,15 @@ def __fill_objects_from_csv(self): self.__add_required_fields(indexed_dict) for object_type, objects in indexed_dict.items(): self.objects[object_type] = list(objects.values()) - if external_mediafiles: - self.objects["mediafiles"] = [ - mediafile - for mediafile in self.objects["mediafiles"] - if mediafile["matching_id"] not in external_mediafiles_ids - ] + if external_mediafiles_ids: + for mediafile in self.objects["mediafiles"]: + matching_id = mediafile["matching_id"] + for entry in external_mediafiles_ids: + if matching_id in entry: + file_source = entry[matching_id] + dynamic_key = f"is_{file_source}_mediafile" + mediafile[dynamic_key] = True + break def __add_required_fields(self, indexed_dict): if not self.required_metadata_values: