Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIPELINE-1860 Added Ecuador normalization pipeline #14

Merged
merged 4 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
[
{
"timestamp": "2024-04-20 08:14:00+00:00",
"shipname": "NAUTILUS XXX",
"ssvid": "98765",
"imo": null,
"mmsi": null,
"callsign": "TN-00-01234",
"utc_time": "2024-04-20 08:14:00+00:00",
"fechaqth": "2024-04-20 03:14:00+00:00",
"idnave": "98765",
"idqth": "202211234",
"lat": -2.19445,
"lon": -80.0178,
"matriculanave": "TN-00-01234",
"mmsi_1": null,
rdgfuentes marked this conversation as resolved.
Show resolved Hide resolved
"nombrenave": "NAUTILUS XXX",
"rumbo": 117,
"velocidad": 5
},
{
"timestamp": "2024-04-20 08:00:00+00:00",
"shipname": "GUAYATUNA DOS",
"ssvid": "12345",
"imo": null,
"mmsi": "735012345",
"callsign": "P -00-00123",
"utc_time": "2024-04-20 08:00:00+00:00",
"fechaqth": "2024-04-20 03:00:00+00:00",
"idnave": "12345",
"idqth": "202203405",
"lat": -7.611016,
"lon": -101.358451,
"matriculanave": "P -00-00123",
"mmsi_1": "735012345",
"nombrenave": "SANTA MARTA DOS",
"rumbo": 72,
"velocidad": 13
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import os
import unittest
from datetime import date, datetime, timezone

import apache_beam as beam
from apache_beam import pvalue
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from tests.util import pcol_equal_to, read_json
from vms_ingestion.normalization import build_pipeline_options_with_defaults
from vms_ingestion.normalization.feeds.ecu_normalize import ECUNormalize

script_path = os.path.dirname(os.path.abspath(__file__))


class TestECUNormalize(unittest.TestCase):

options = build_pipeline_options_with_defaults(
argv=[
"--country_code=ecu",
'--source=""',
'--destination=""',
'--start_date=""',
'--end_date=""',
]
)

# Our input data, which will make up the initial PCollection.
RECORDS = [
{
**x,
"utc_time": datetime.fromisoformat(x["utc_time"]),
}
for x in read_json(f"{script_path}/data/raw_ecuador.json")
]

# Our output data, which is the expected data that the final PCollection must match.
EXPECTED = [
{
"callsign": "TN-00-01234",
"class_b_cs_flag": None,
"course": 117.0,
"destination": None,
"heading": None,
"imo": None,
"ingested_at": None,
"internal_id": "98765",
"lat": -2.19445,
"length": None,
"lon": -80.0178,
"msgid": "0ac0577b3ac22f039da88b4707f42f4f",
"received_at": None,
"receiver": None,
"receiver_type": None,
"shipname": "NAUTILUS XXX",
"shiptype": "NATIONAL TRAFFIC",
"source": "ECUADOR_VMS",
"source_fleet": None,
"source_provider": "DIRNEA",
"source_ssvid": "98765",
"source_tenant": "ECU",
"source_type": "VMS",
"speed": 5.0,
"ssvid": "ECU|i:98765|s:NAUTILUS30|c:TN0001234",
"status": None,
"timestamp": datetime(2024, 4, 20, 8, 14, tzinfo=timezone.utc),
"timestamp_date": date(2024, 4, 20),
"type": "VMS",
"width": None,
},
{
"callsign": "P -00-00123",
"class_b_cs_flag": None,
"course": 72.0,
"destination": None,
"heading": None,
"imo": None,
"ingested_at": None,
"internal_id": "12345",
"lat": -7.611016,
"length": None,
"lon": -101.358451,
"msgid": "b72ff16e9bbac5e90ff6d4b7b499b023",
"received_at": None,
"receiver": None,
"receiver_type": None,
"shipname": "SANTA MARTA DOS",
"shiptype": "FISHING",
"source": "ECUADOR_VMS",
"source_fleet": None,
"source_provider": "DIRNEA",
"source_ssvid": "12345",
"source_tenant": "ECU",
"source_type": "VMS",
"speed": 13.0,
"ssvid": "ECU|i:12345|s:SANTAMARTA2|c:P0000123",
"status": None,
"timestamp": datetime(2024, 4, 20, 8, 0, tzinfo=timezone.utc),
"timestamp_date": date(2024, 4, 20),
"type": "VMS",
"width": None,
},
]

# Example test that tests the pipeline's transforms.
def test_normalize(self):
with TestPipeline(options=TestECUNormalize.options) as p:

# Create a PCollection from the RECORDS static input data.
input = p | beam.Create(TestECUNormalize.RECORDS)

# Run ALL the pipeline's transforms (in this case, the Normalize transform).
output: pvalue.PCollection = input | ECUNormalize(feed="ecu")

# Assert that the output PCollection matches the EXPECTED data.
assert_that(
output, pcol_equal_to(TestECUNormalize.EXPECTED), label="CheckOutput"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import unittest
from datetime import datetime

import apache_beam as beam
import pytest
from apache_beam import pvalue
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from tests.util import pcol_equal_to
from vms_ingestion.normalization import build_pipeline_options_with_defaults
from vms_ingestion.normalization.transforms.ecu_map_source_message import (
ECUMapSourceMessage,
ecu_infer_shiptype,
)


class TestECUMapSourceMessage(unittest.TestCase):
options = build_pipeline_options_with_defaults(
argv=[
"--country_code=ecu",
'--source=""',
'--destination=""',
'--start_date=""',
'--end_date=""',
]
)
# Our input data, which will make up the initial PCollection.
RECORDS = [
{
"utc_time": datetime.fromisoformat("2024-01-01 09:07:00+00:00"),
"fechaqth": datetime.fromisoformat("2024-01-01 04:07:00+00:00"),
"idnave": "98765",
"idqth": "205703882",
"lat": -2.5014833333,
"lon": -79.82605,
"matriculanave": "B -00-123456",
"mmsi": None,
"nombrenave": "SANTA MARIA I",
"rumbo": "42",
"velocidad": "21",
},
]

# Our output data, which is the expected data that the final PCollection must match.
EXPECTED = [
{
"callsign": "B -00-123456",
"course": 42.0,
"internal_id": "98765",
"lat": -2.5014833333,
"lon": -79.82605,
"shipname": "SANTA MARIA I",
"shiptype": "boat",
"speed": 21.0,
"timestamp": datetime.fromisoformat("2024-01-01 09:07:00+00:00"),
}
]

# Tests the transform.
def test_ecu_map_source_message(self):
with TestPipeline(options=TestECUMapSourceMessage.options) as p:

# Create a PCollection from the RECORDS static input data.
input = p | beam.Create(TestECUMapSourceMessage.RECORDS)

# Run ALL the pipeline's transforms (in this case, the Normalize transform).
output: pvalue.PCollection = input | ECUMapSourceMessage()

# Assert that the output PCollection matches the EXPECTED data.
assert_that(
output,
pcol_equal_to(TestECUMapSourceMessage.EXPECTED),
label="CheckOutput",
)


@pytest.mark.parametrize(
"matriculanave,shiptype",
[
("B -00-11615", "boat"),
("DA-00-00545", "auxiliary"),
(" B-07-04088 ", "boat"),
("TN -03-10534", "national traffic"),
("TI-00-00002", "international traffic"),
("P -00-00022", "fishing"),
("R -00-00107", "tug"),
(" ", "unknown"),
(None, "unknown"),
("4040077351SP", "unknown"),
],
)
def test_ecu_infer_shiptype(matriculanave, shiptype):
assert ecu_infer_shiptype(matriculanave) == shiptype
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from vms_ingestion.normalization.feeds.bra_normalize import BRANormalize
from vms_ingestion.normalization.feeds.chl_normalize import CHLNormalize
from vms_ingestion.normalization.feeds.cri_normalize import CRINormalize
from vms_ingestion.normalization.feeds.ecu_normalize import ECUNormalize


class FeedNormalizationFactory:
Expand All @@ -15,5 +16,7 @@ def get_normalization(feed) -> beam.PTransform:
return CHLNormalize(feed=feed_id)
elif feed_id == "cri":
return CRINormalize(feed=feed_id)
elif feed_id == "ecu":
return ECUNormalize(feed=feed_id)
else:
raise ValueError(feed)
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import apache_beam as beam
from vms_ingestion.normalization.transforms.ecu_map_source_message import (
ECUMapSourceMessage,
)
from vms_ingestion.normalization.transforms.map_normalized_message import (
MapNormalizedMessage,
)


class ECUNormalize(beam.PTransform):

def __init__(self, feed) -> None:
self.feed = feed
self.source_provider = "DIRNEA"
self.source_format = "ECUADOR_VMS"

def expand(self, pcoll):

return (
pcoll
| ECUMapSourceMessage()
| MapNormalizedMessage(
feed=self.feed,
source_provider=self.source_provider,
source_format=self.source_format,
)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# from datetime import datetime

import re

import apache_beam as beam

SHIPTYPE_BY_MATRICULA = {
"TI": "international traffic",
"TN": "national traffic",
"P": "fishing",
"R": "tug",
"B": "boat",
"DA": "auxiliary",
}


def ecu_map_source_message(msg):
return {
"shipname": f'{msg["nombrenave"]}'.strip(),
"timestamp": msg["utc_time"],
"lat": float(msg["lat"]),
"lon": float(msg["lon"]),
"speed": float(msg["velocidad"]) if msg.get("velocidad") is not None else None,
"course": float(msg["rumbo"]) if msg.get("rumbo") is not None else None,
"internal_id": f'{msg["idnave"]}' if msg.get("idnave") else None,
"shiptype": ecu_infer_shiptype(msg["matriculanave"]),
"callsign": f'{msg["matriculanave"]}',
}


def ecu_infer_shiptype(matriculanave):
# This code set the type fishing for matriculanave that starts with a
# set of specific strings
prefixes = "|".join(list(SHIPTYPE_BY_MATRICULA.keys()))
p = re.compile(f"^({prefixes}).*$")
code = f"{matriculanave}".upper().strip()

m = p.match(code)
if m:
return SHIPTYPE_BY_MATRICULA[m.group(1)]

return "unknown"


class ECUMapSourceMessage(beam.PTransform):

def expand(self, pcoll):
return pcoll | "Preliminary source fields mapping" >> beam.Map(
ecu_map_source_message
)
Loading