Skip to content

Commit

Permalink
Revert "Add support for stacked sinks (#611)" (#612)
Browse files Browse the repository at this point in the history
This reverts commit 725eb40.
  • Loading branch information
saiharshavellanki authored Dec 6, 2024
1 parent 725eb40 commit af3c39e
Show file tree
Hide file tree
Showing 11 changed files with 15 additions and 400 deletions.
94 changes: 0 additions & 94 deletions docs/examples/concepts/sink.py

This file was deleted.

6 changes: 0 additions & 6 deletions docs/pages/api-reference/decorators/sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@ This means that the column denoted by the key is aliased to another column in th
This is useful, for instance, when you want to rename columns while sinking them.
</Expandable>

<Expandable title="stacked" type='Optional[bool]' defaultVal="None">
This indicates whether the sink is stacked on top of an existing sink.
If multiple sinks are added to the dataset, set this field to True
for all sinks except the first one.
</Expandable>

<pre snippet="api-reference/sinks/sink#sink_decorator"
status="success" message="Specifying options in sink decorator"
>
Expand Down
10 changes: 1 addition & 9 deletions docs/pages/concepts/sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,4 @@ publishing changes to your Kafka.

Fennel ships with data sinks to a couple of [common datastores](/api-reference/sink_connectors)
so that you can 'sink' from your Fennel datasets to your external datasets.
Sinks to many other common data stores will be added soon.

## Stacking Multiple Sinks
Stacking two (or more) sinks on top of a dataset simply leads to the dataset
getting sinked to all of them.

<pre snippet="concepts/sink#stacked" status="success"
message="Writing data to Snowflake and Kafka"
></pre>
Sinks to many other common data stores will be added soon.
3 changes: 0 additions & 3 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
# Changelog

## [1.5.59] - 2024-12-06
- Add support for stacked sinks

## [1.5.58] - 2024-11-24
- Allow min/max aggregation on date, datetime and decimal dtypes

Expand Down
3 changes: 0 additions & 3 deletions fennel/connectors/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ def sink(
since: Optional[datetime] = None,
until: Optional[datetime] = None,
env: Optional[Union[str, List[str]]] = None,
stacked: Optional[bool] = None,
) -> Callable[[T], Any]:
if not isinstance(conn, DataConnector):
if not isinstance(conn, DataSource):
Expand Down Expand Up @@ -353,7 +352,6 @@ def decorator(dataset_cls: T):
conn.renames = renames
conn.since = since
conn.until = until
conn.stacked = stacked
conn.envs = EnvSelector(env)
connectors = getattr(dataset_cls, SINK_FIELD, [])
connectors.append(conn)
Expand Down Expand Up @@ -823,7 +821,6 @@ class DataConnector:
how: Optional[Literal["incremental", "recreate"] | SnapshotData] = None
create: Optional[bool] = None
renames: Optional[Dict[str, str]] = {}
stacked: Optional[bool] = None

def identifier(self):
raise NotImplementedError
Expand Down
51 changes: 2 additions & 49 deletions fennel/connectors/test_connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,14 +680,6 @@ class UserInfoDataset:
cdc="debezium",
env=["prod_new4"],
)
@sink(
http_with_secret.path(
endpoint="/sink3", limit=100, headers={"Foo": "Bar"}
),
cdc="debezium",
env=["prod_new4"],
stacked=True,
)
@dataset
class UserInfoDatasetDerived:
user_id: int = field(key=True)
Expand Down Expand Up @@ -942,47 +934,9 @@ def create_user_transactions(cls, dataset: Dataset):
sync_request = view._get_sync_request_proto(env="prod_new4")
assert len(sync_request.datasets) == 2
assert len(sync_request.sources) == 1
assert len(sync_request.sinks) == 2
assert len(sync_request.sinks) == 1
assert len(sync_request.extdbs) == 2

sink_request = sync_request.sinks[1]
s = {
"table": {
"http_path": {
"db": {
"name": "http_sink_with_secret",
"http": {
"host_secret": {
"secret_arn": "arn:aws:secretsmanager:us-west-2:123456789012:secret:fennel-test-secret-1",
"role_arn": "arn:aws:iam::123456789012:role/fennel-test-role",
"path": ["http_host"],
},
"healthz": "/health",
"ca_cert": {
"secret_ref": {
"secret_arn": "arn:aws:secretsmanager:us-west-2:123456789012:secret:fennel-test-secret-1",
"role_arn": "arn:aws:iam::123456789012:role/fennel-test-role",
"path": ["ca_cert"],
},
},
},
},
"endpoint": "/sink",
"limit": 100,
"headers": {
"Foo": "Bar",
},
},
},
"dataset": "UserInfoDatasetDerived",
"dsVersion": 1,
"cdc": "Debezium",
}
expected_sink_request = ParseDict(s, connector_proto.Sink())
assert sink_request == expected_sink_request, error_message(
sink_request, expected_sink_request
)

sink_request = sync_request.sinks[0]
s = {
"table": {
Expand All @@ -1005,7 +959,7 @@ def create_user_transactions(cls, dataset: Dataset):
},
},
},
"endpoint": "/sink3",
"endpoint": "/sink",
"limit": 100,
"headers": {
"Foo": "Bar",
Expand All @@ -1015,7 +969,6 @@ def create_user_transactions(cls, dataset: Dataset):
"dataset": "UserInfoDatasetDerived",
"dsVersion": 1,
"cdc": "Debezium",
"stacked": True,
}
expected_sink_request = ParseDict(s, connector_proto.Sink())
assert sink_request == expected_sink_request, error_message(
Expand Down
Loading

0 comments on commit af3c39e

Please sign in to comment.