Skip to content

Commit

Permalink
Filter ingestion messages by date range
Browse files Browse the repository at this point in the history
  • Loading branch information
rdgfuentes committed Aug 16, 2024
1 parent a7a28a8 commit ce31be0
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 21 deletions.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
read_excel_to_dict,
)
from vms_ingestion.normalization import build_pipeline_options_with_defaults
from vms_ingestion.options import CommonPipelineOptions

script_path = os.path.dirname(os.path.abspath(__file__))

Expand All @@ -35,8 +36,9 @@ class TestPANIngestion(unittest.TestCase):
"--country_code=pan",
'--source=""',
'--destination=""',
'--start_date=""',
'--end_date=""',
'--start_date="2020-07-01"',
'--end_date="2020-07-02"',
"--fleet=trawler",
]
)

Expand All @@ -60,7 +62,7 @@ class TestPANIngestion(unittest.TestCase):
"mmsi": None,
"msgid": "9704687db8d9d0e0b7b3e53cdb826a1bc9616bb09536c3db059c753dd6283fbe",
"shipname": "NAUTILUS",
"source_fleet": None,
"source_fleet": "TRAWLER",
"source_tenant": "PAN",
"speed": 0.0,
"ssvid": "a8895004fe71fdfea4cf1dfae96e142096cc15f799be7dd264f2cd6ba963fc2b",
Expand All @@ -82,7 +84,7 @@ class TestPANIngestion(unittest.TestCase):
"mmsi": None,
"msgid": "3ed8e3d0533d22c755bc4ebfa042282686b2c05b841b0beebbe149bd63ac045a",
"shipname": "NAUTILUS",
"source_fleet": None,
"source_fleet": "TRAWLER",
"source_tenant": "PAN",
"speed": 3.0,
"ssvid": "a8895004fe71fdfea4cf1dfae96e142096cc15f799be7dd264f2cd6ba963fc2b",
Expand All @@ -104,7 +106,7 @@ class TestPANIngestion(unittest.TestCase):
"mmsi": None,
"msgid": "2fa4279ccd604b5414e67f771334fd1b40ae41aff36e310300b961e33f2e849f",
"shipname": "NAUTILUS",
"source_fleet": None,
"source_fleet": "TRAWLER",
"source_tenant": "PAN",
"speed": None,
"ssvid": "a8895004fe71fdfea4cf1dfae96e142096cc15f799be7dd264f2cd6ba963fc2b",
Expand All @@ -126,7 +128,7 @@ class TestPANIngestion(unittest.TestCase):
"mmsi": None,
"msgid": "df04f7dc62f7113a949ecef1b49f175f37351b52afe2d76543fab1b01825c3cf",
"shipname": "NAUTILUS",
"source_fleet": None,
"source_fleet": "TRAWLER",
"source_tenant": "PAN",
"speed": 0.0,
"ssvid": "a8895004fe71fdfea4cf1dfae96e142096cc15f799be7dd264f2cd6ba963fc2b",
Expand All @@ -141,20 +143,26 @@ def setUp(self):
# Example test that tests the pipeline's transforms.
def test_excel_to_bq(self):
with TestPipeline(options=TestPANIngestion.options) as p:

# Create a PCollection from the RECORDS static input data.
input = p | beam.Create(TestPANIngestion.RECORDS)

self.monkeypatch.setattr(
map_ingested_message, "get_ingested_at", mocked_now
)
# Run ALL the pipeline's transforms (in this case, the Normalize transform).
# Run ALL the pipeline's transforms (in this case, the Ingestion transform).
output: pvalue.PCollection = (
input
| "Read Excel Files" >> ReadMatches()
| "Convert Excel Files to Dict"
>> beam.FlatMap(lambda x: read_excel_to_dict(x.read()))
| "Ingest data" >> FeedIngestionFactory.get_ingestion(feed="pan")
| "Filter messages inside date range"
>> beam.Filter(
lambda x: x["timestamp"] >= datetime.datetime(2020, 7, 1, 0, 0)
and x["timestamp"] < datetime.datetime(2020, 9, 1, 0, 0)
)
| "Map ingested message"
>> map_ingested_message.MapIngestedMessage(feed="pan", fleet="trawler")
)

# Assert that the output PCollection matches the EXPECTED data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@

import apache_beam as beam
from shipdataprocess.standardize import standardize_str
from vms_ingestion.ingestion.excel_to_bq.transforms.map_ingested_message import (
MapIngestedMessage,
)


# Function to convert DMS to decimal
Expand Down Expand Up @@ -60,15 +57,9 @@ def get_ingested_at():

class PANIngest(beam.PTransform):

def __init__(self, feed, fleet=None) -> None:
def __init__(self, feed) -> None:
self.feed = feed
self.fleet = fleet

def expand(self, pcoll):

return (
pcoll
| "map position fields" >> beam.Map(lambda x: map_pan_fields(x))
| "Map ingested message"
>> MapIngestedMessage(feed=self.feed, fleet=self.fleet)
)
return pcoll | "Map position fields" >> beam.Map(lambda x: map_pan_fields(x))
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
FeedIngestionFactory,
)
from vms_ingestion.ingestion.excel_to_bq.options import IngestionExcelToBQOptions
from vms_ingestion.ingestion.excel_to_bq.transforms.map_ingested_message import (
MapIngestedMessage,
)
from vms_ingestion.ingestion.excel_to_bq.transforms.read_excel_to_dict import (
read_excel_to_dict,
)
Expand Down Expand Up @@ -77,8 +80,14 @@ def __init__(self, options):
self.pipeline
| "Read source" >> ReadSource(source=self.source)
| "Read Excel Files" >> beam.FlatMap(lambda x: read_excel_to_dict(x.read()))
| "Ingest data"
>> FeedIngestionFactory.get_ingestion(feed=self.feed, fleet=self.fleet)
| "Ingest data" >> FeedIngestionFactory.get_ingestion(feed=self.feed)
| "Filter messages inside date range"
>> beam.Filter(
lambda x: x["timestamp"] >= self.start_date
and x["timestamp"] < self.end_date
)
| "Map ingested message"
>> MapIngestedMessage(feed=self.feed, fleet=self.fleet)
| PickOutputFields(fields=[f"{field}" for field in self.output_fields])
# | "Print" >> beam.Map(print)
| "Write Sink"
Expand Down

0 comments on commit ce31be0

Please sign in to comment.