Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Add support for stacked sinks (#611)" #612

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 0 additions & 94 deletions docs/examples/concepts/sink.py

This file was deleted.

6 changes: 0 additions & 6 deletions docs/pages/api-reference/decorators/sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@ This means that the column denoted by the key is aliased to another column in th
This is useful, for instance, when you want to rename columns while sinking them.
</Expandable>

<Expandable title="stacked" type='Optional[bool]' defaultVal="None">
This indicates whether the sink is stacked on top of an existing sink.
If multiple sinks are added to the dataset, set this field to True
for all sinks except the first one.
</Expandable>

<pre snippet="api-reference/sinks/sink#sink_decorator"
status="success" message="Specifying options in sink decorator"
>
Expand Down
10 changes: 1 addition & 9 deletions docs/pages/concepts/sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,4 @@ publishing changes to your Kafka.

Fennel ships with data sinks to a couple of [common datastores](/api-reference/sink_connectors)
so that you can 'sink' from your Fennel datasets to your external datasets.
Sinks to many other common data stores will be added soon.

## Stacking Multiple Sinks
Stacking two (or more) sinks on top of a dataset simply leads to the dataset
getting sinked to all of them.

<pre snippet="concepts/sink#stacked" status="success"
message="Writing data to Snowflake and Kafka"
></pre>
Sinks to many other common data stores will be added soon.
3 changes: 0 additions & 3 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
# Changelog

## [1.5.59] - 2024-12-06
- Add support for stacked sinks

## [1.5.58] - 2024-11-24
- Allow min/max aggregation on date, datetime and decimal dtypes

Expand Down
3 changes: 0 additions & 3 deletions fennel/connectors/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ def sink(
since: Optional[datetime] = None,
until: Optional[datetime] = None,
env: Optional[Union[str, List[str]]] = None,
stacked: Optional[bool] = None,
) -> Callable[[T], Any]:
if not isinstance(conn, DataConnector):
if not isinstance(conn, DataSource):
Expand Down Expand Up @@ -353,7 +352,6 @@ def decorator(dataset_cls: T):
conn.renames = renames
conn.since = since
conn.until = until
conn.stacked = stacked
conn.envs = EnvSelector(env)
connectors = getattr(dataset_cls, SINK_FIELD, [])
connectors.append(conn)
Expand Down Expand Up @@ -823,7 +821,6 @@ class DataConnector:
how: Optional[Literal["incremental", "recreate"] | SnapshotData] = None
create: Optional[bool] = None
renames: Optional[Dict[str, str]] = {}
stacked: Optional[bool] = None

def identifier(self):
raise NotImplementedError
Expand Down
51 changes: 2 additions & 49 deletions fennel/connectors/test_connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,14 +680,6 @@ class UserInfoDataset:
cdc="debezium",
env=["prod_new4"],
)
@sink(
http_with_secret.path(
endpoint="/sink3", limit=100, headers={"Foo": "Bar"}
),
cdc="debezium",
env=["prod_new4"],
stacked=True,
)
@dataset
class UserInfoDatasetDerived:
user_id: int = field(key=True)
Expand Down Expand Up @@ -942,47 +934,9 @@ def create_user_transactions(cls, dataset: Dataset):
sync_request = view._get_sync_request_proto(env="prod_new4")
assert len(sync_request.datasets) == 2
assert len(sync_request.sources) == 1
assert len(sync_request.sinks) == 2
assert len(sync_request.sinks) == 1
assert len(sync_request.extdbs) == 2

sink_request = sync_request.sinks[1]
s = {
"table": {
"http_path": {
"db": {
"name": "http_sink_with_secret",
"http": {
"host_secret": {
"secret_arn": "arn:aws:secretsmanager:us-west-2:123456789012:secret:fennel-test-secret-1",
"role_arn": "arn:aws:iam::123456789012:role/fennel-test-role",
"path": ["http_host"],
},
"healthz": "/health",
"ca_cert": {
"secret_ref": {
"secret_arn": "arn:aws:secretsmanager:us-west-2:123456789012:secret:fennel-test-secret-1",
"role_arn": "arn:aws:iam::123456789012:role/fennel-test-role",
"path": ["ca_cert"],
},
},
},
},
"endpoint": "/sink",
"limit": 100,
"headers": {
"Foo": "Bar",
},
},
},
"dataset": "UserInfoDatasetDerived",
"dsVersion": 1,
"cdc": "Debezium",
}
expected_sink_request = ParseDict(s, connector_proto.Sink())
assert sink_request == expected_sink_request, error_message(
sink_request, expected_sink_request
)

sink_request = sync_request.sinks[0]
s = {
"table": {
Expand All @@ -1005,7 +959,7 @@ def create_user_transactions(cls, dataset: Dataset):
},
},
},
"endpoint": "/sink3",
"endpoint": "/sink",
"limit": 100,
"headers": {
"Foo": "Bar",
Expand All @@ -1015,7 +969,6 @@ def create_user_transactions(cls, dataset: Dataset):
"dataset": "UserInfoDatasetDerived",
"dsVersion": 1,
"cdc": "Debezium",
"stacked": True,
}
expected_sink_request = ParseDict(s, connector_proto.Sink())
assert sink_request == expected_sink_request, error_message(
Expand Down
Loading
Loading