diff --git a/packages/pipe-vms-ingestion/vms_ingestion/normalization/pipeline.py b/packages/pipe-vms-ingestion/vms_ingestion/normalization/pipeline.py index 2339421..62b1357 100644 --- a/packages/pipe-vms-ingestion/vms_ingestion/normalization/pipeline.py +++ b/packages/pipe-vms-ingestion/vms_ingestion/normalization/pipeline.py @@ -34,6 +34,29 @@ ) +def get_destination(destination: str, default: str, min_size: int = 3): + empty = [None] * min_size + + def complete_parts_with_empty(parts): + q = min_size - len(parts) + return [*empty[:q], *parts] + + # break destination and default into parts and complete the missing + # parts at the begining with None items + destination_parts = complete_parts_with_empty(destination.split(".")) + default_parts = complete_parts_with_empty(default.split(".")) + + # unify parts completing the missing in destination with + # their corresponding from default + result = { + **{i: x for i, x in enumerate(default_parts) if x}, + **{i: x for i, x in enumerate(destination_parts) if x}, + } + + # join all the parts with "." and return it + return ".".join([v for _, v in result.items()]) + + class NormalizationPipeline: def __init__(self, options): self.pipeline = beam.Pipeline(options=options) @@ -46,18 +69,9 @@ def __init__(self, options): self.source_timestamp_field = params.source_timestamp_field self.destination = params.destination self.affected_entities = params.affected_entities.split(",") - - default_vessel_info_parts = self.destination.split(".")[:2] + [ - "reported_vessel_info" - ] - dst_vessel_info_parts = [ - x for x in params.destination_vessel_info.split(".") if x - ] - while len(dst_vessel_info_parts) < 3: - index = len(default_vessel_info_parts) - len(dst_vessel_info_parts) - 1 - dst_vessel_info_parts.insert(0, default_vessel_info_parts[index]) - - self.destination_vessel_info = ".".join(dst_vessel_info_parts) + self.destination_vessel_info = get_destination( + destination=params.destination_vessel_info, default=params.destination + ) self.start_date = parse_yyyy_mm_dd_param(params.start_date) self.end_date = parse_yyyy_mm_dd_param(params.end_date) self.labels = list_to_dict(gCloudParams.labels)