Skip to content

Commit

Permalink
refactor get destination table for vessel info using default
Browse files Browse the repository at this point in the history
  • Loading branch information
rdgfuentes committed Nov 15, 2024
1 parent 45d3a7c commit 3c71776
Showing 1 changed file with 26 additions and 12 deletions.
38 changes: 26 additions & 12 deletions packages/pipe-vms-ingestion/vms_ingestion/normalization/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,29 @@
)


def get_destination(destination: str, default: str, min_size: int = 3):
empty = [None] * min_size

def complete_parts_with_empty(parts):
q = min_size - len(parts)
return [*empty[:q], *parts]

# break destination and default into parts and complete the missing
# parts at the begining with None items
destination_parts = complete_parts_with_empty(destination.split("."))
default_parts = complete_parts_with_empty(default.split("."))

# unify parts completing the missing in destination with
# their corresponding from default
result = {
**{i: x for i, x in enumerate(default_parts) if x},
**{i: x for i, x in enumerate(destination_parts) if x},
}

# join all the parts with "." and return it
return ".".join([v for _, v in result.items()])


class NormalizationPipeline:
def __init__(self, options):
self.pipeline = beam.Pipeline(options=options)
Expand All @@ -46,18 +69,9 @@ def __init__(self, options):
self.source_timestamp_field = params.source_timestamp_field
self.destination = params.destination
self.affected_entities = params.affected_entities.split(",")

default_vessel_info_parts = self.destination.split(".")[:2] + [
"reported_vessel_info"
]
dst_vessel_info_parts = [
x for x in params.destination_vessel_info.split(".") if x
]
while len(dst_vessel_info_parts) < 3:
index = len(default_vessel_info_parts) - len(dst_vessel_info_parts) - 1
dst_vessel_info_parts.insert(0, default_vessel_info_parts[index])

self.destination_vessel_info = ".".join(dst_vessel_info_parts)
self.destination_vessel_info = get_destination(
destination=params.destination_vessel_info, default=params.destination
)
self.start_date = parse_yyyy_mm_dd_param(params.start_date)
self.end_date = parse_yyyy_mm_dd_param(params.end_date)
self.labels = list_to_dict(gCloudParams.labels)
Expand Down

0 comments on commit 3c71776

Please sign in to comment.