Skip to content

Commit

Permalink
Merge pull request #20 from DanielPDWalker/dev
Browse files Browse the repository at this point in the history
Feature add park detail and child stream
  • Loading branch information
DanielPDWalker authored Oct 10, 2023
2 parents 9c1ee37 + 98aeb27 commit bb7bfc0
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 55 deletions.
58 changes: 31 additions & 27 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "tap-theme-parks"
version = "0.0.1"
version = "0.0.2"
description = "`tap-theme-parks` is a Singer tap for theme parks, built with the Meltano Singer SDK."
readme = "README.md"
authors = ["Daniel Walker"]
Expand Down
102 changes: 79 additions & 23 deletions tap_theme_parks/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,18 @@ class DestinationsStream(ThemeParksStream):

def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
"""Return a context dictionary for child streams."""
return {"entity_id": record["id"]}
return {
"destination_id": record["id"],
"park_ids": [x["id"] for x in record["parks"]],
}


class DestinationDetailsStream(ThemeParksStream):
"""Define destination details stream"""
class ParkDetailsStream(ThemeParksStream):
"""Define park details stream"""

parent_stream_type = DestinationsStream
name = "destination_detail"
path = "/entity/{entity_id}"
name = "park_detail"
path_template = "/entity/{park_id}"
primary_keys = ["id"]
replication_key = None

Expand All @@ -63,13 +66,22 @@ class DestinationDetailsStream(ThemeParksStream):
th.Property("externalId", th.StringType),
).to_dict()

def request_records(self, context: dict | None) -> Iterable[dict]:
for id in context["park_ids"]:
self.path = self.path_template.format(park_id=id)
yield from super().request_records(context)

class DestinationChildrenStream(ThemeParksStream):
"""Define destination children stream"""
def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
"""Return a context dictionary for child streams."""
return {"park_id": record["id"]}

parent_stream_type = DestinationsStream
name = "destination_children"
path = "/entity/{entity_id}/children"

class ParkChildrenStream(ThemeParksStream):
"""Define park children stream"""

parent_stream_type = ParkDetailsStream
name = "park_children"
path = "/entity/{park_id}/children"
primary_keys = ["id"]
replication_key = None

Expand All @@ -93,30 +105,69 @@ class DestinationChildrenStream(ThemeParksStream):
).to_dict()


class LiveDataParentStream(ThemeParksStream):
"""Live data parent stream, used to pass user supplied ids from config to the LiveDataStream"""
class DestinationDetailsStream(ThemeParksStream):
"""Define destination details stream"""

name = "live_data_parent_stream"
parent_stream_type = DestinationsStream
name = "destination_detail"
path = "/entity/{destination_id}"
primary_keys = ["id"]
replication_key = None

schema = th.PropertiesList(th.Property("id", th.StringType)).to_dict()
schema = th.PropertiesList(
th.Property("id", th.StringType),
th.Property("name", th.StringType),
th.Property("slug", th.StringType),
th.Property(
"location",
th.ObjectType(
th.Property("latitude", th.NumberType),
th.Property("longitude", th.NumberType),
th.Property("pointOfInterest", th.ArrayType(th.StringType)),
),
),
th.Property("parentId", th.StringType),
th.Property("timezone", th.StringType),
th.Property("entityType", th.StringType),
th.Property("destinationId", th.StringType),
th.Property("externalId", th.StringType),
).to_dict()

def get_records(self, context: Optional[Dict]) -> Iterable[Dict[str, Any]]:
"""Return a generator of record-type dictionary objects from config"""
for id in self.config.get("live_data_array"):
yield {"id": id}

def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
"""Return a context dictionary for child streams."""
return {"live_data_id": record["id"]}
class DestinationChildrenStream(ThemeParksStream):
"""Define destination children stream"""

parent_stream_type = DestinationsStream
name = "destination_children"
path = "/entity/{destination_id}/children"
primary_keys = ["id"]
replication_key = None

schema = th.PropertiesList(
th.Property("id", th.StringType),
th.Property("name", th.StringType),
th.Property("entityType", th.StringType),
th.Property("timezone", th.StringType),
th.Property(
"children",
th.ArrayType(
th.ObjectType(
th.Property("id", th.StringType),
th.Property("name", th.StringType),
th.Property("entityType", th.StringType),
th.Property("slug", th.StringType),
th.Property("externalId", th.StringType),
)
),
),
).to_dict()


class LiveDataStream(ThemeParksStream):
"""Define live data stream"""

parent_stream_type = LiveDataParentStream
name = "live_data"
path = "/entity/{live_data_id}/live"
path_template = "/entity/{live_data_id}/live"
primary_keys = ["id"]
replication_key = None

Expand Down Expand Up @@ -193,3 +244,8 @@ class LiveDataStream(ThemeParksStream):
),
),
).to_dict()

def request_records(self, context: dict | None) -> Iterable[dict]:
for id in self.config.get("live_data_array", []):
self.path = self.path_template.format(live_data_id=id)
yield from super().request_records(context)
3 changes: 2 additions & 1 deletion tap_theme_parks/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ def discover_streams(self) -> list[streams.ThemeParksStream]:
streams.DestinationsStream(self),
streams.DestinationDetailsStream(self),
streams.DestinationChildrenStream(self),
streams.ParkDetailsStream(self),
streams.ParkChildrenStream(self),
]

if self.config.get("live_data_array"):
selected_streams.append(streams.LiveDataParentStream(self))
selected_streams.append(streams.LiveDataStream(self))

return selected_streams
Expand Down
6 changes: 3 additions & 3 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ class TestEnabledStreams(unittest.TestCase):
def test_default_streams(self):
catalog = TapThemeParks().discover_streams()

self.assertEqual(len(catalog), 3, "Expected 3 streams from catalog by default")
self.assertEqual(len(catalog), 5, "Expected 5 streams from catalog by default")

def test_streams_with_live_data_array(self):
catalog = TapThemeParks(config=SAMPLE_CONFIG).discover_streams()

self.assertEqual(
len(catalog),
5,
"Expected 5 streams from catalog with the live_data_array setting provided",
6,
"Expected 6 streams from catalog with the live_data_array setting provided",
)

0 comments on commit bb7bfc0

Please sign in to comment.