Skip to content

Commit

Permalink
Add support for stacked sinks (#611)
Browse files Browse the repository at this point in the history
  • Loading branch information
saiharshavellanki authored Dec 6, 2024
1 parent 417a95d commit 725eb40
Show file tree
Hide file tree
Showing 11 changed files with 400 additions and 15 deletions.
94 changes: 94 additions & 0 deletions docs/examples/concepts/sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import os
import sys
from datetime import datetime
from fennel.expr import col
from fennel.testing import mock

__owner__ = "[email protected]"

os.environ["KAFKA_HOST"] = "some-host"
os.environ["KAFKA_USERNAME"] = "some-username"
os.environ["KAFKA_PASSWORD"] = "some-password"
os.environ["DB_NAME"] = "some-db"
os.environ["SNOWFLAKE_USERNAME"] = "snowflake-username"
os.environ["SNOWFLAKE_PASSWORD"] = "snowflake-password"


@mock
def test_stacked_sinks(client):
from fennel.connectors import Kafka, S3, eval, source
from fennel.datasets import dataset

kafka = Kafka(
name="my_kafka",
bootstrap_servers="localhost:9092",
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="PLAIN",
sasl_plain_username="user",
sasl_plain_password="password",
)
s3 = S3(name="mys3")
cutoff = datetime(2024, 1, 1, 0, 0, 0)
bucket = s3.bucket("data", path="orders")

# docsnip-highlight start
@source(bucket, disorder="1w", cdc="append", until=cutoff)
@source(kafka.topic("order"), disorder="1d", cdc="append", since=cutoff)
# docsnip-highlight end
@dataset
class UserLocation:
uid: int
city: str
country: str
update_time: datetime

# docsnip stacked
from fennel.connectors import Kafka, Snowflake, sink
from fennel.datasets import dataset, pipeline, Dataset
from fennel.lib.params import inputs

kafka = Kafka(
name="kafka_src",
bootstrap_servers=os.environ["KAFKA_HOST"],
security_protocol="PLAINTEXT",
sasl_mechanism="PLAIN",
sasl_plain_username=os.environ["KAFKA_USERNAME"],
sasl_plain_password=os.environ["KAFKA_PASSWORD"],
)
snowflake = Snowflake(
name="my_snowflake",
account="VPECCVJ-MUB03765",
warehouse="TEST",
db_name=os.environ["DB_NAME"],
schema="PUBLIC",
role="ACCOUNTADMIN",
username=os.environ["SNOWFLAKE_USERNAME"],
password=os.environ["SNOWFLAKE_PASSWORD"],
)

# docsnip-highlight start
@sink(
snowflake.table("test_table"),
every="1d",
how="incremental",
renames={"uid": "new_uid"},
stacked=True,
)
@sink(kafka.topic("user_location"), cdc="debezium")
# docsnip-highlight end
@dataset
class UserLocationFiltered:
uid: int
city: str
country: str
update_time: datetime

@pipeline
@inputs(UserLocation)
def user_location_count(cls, dataset: Dataset):
return dataset.filter(lambda row: row["country"] != "United States")

# /docsnip
client.commit(
message="some commit msg", datasets=[UserLocationFiltered, UserLocation]
)
6 changes: 6 additions & 0 deletions docs/pages/api-reference/decorators/sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ This means that the column denoted by the key is aliased to another column in th
This is useful, for instance, when you want to rename columns while sinking them.
</Expandable>

<Expandable title="stacked" type='Optional[bool]' defaultVal="None">
This indicates whether the sink is stacked on top of an existing sink.
If multiple sinks are added to the dataset, set this field to True
for all sinks except the first one.
</Expandable>

<pre snippet="api-reference/sinks/sink#sink_decorator"
status="success" message="Specifying options in sink decorator"
>
Expand Down
10 changes: 9 additions & 1 deletion docs/pages/concepts/sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,12 @@ publishing changes to your Kafka.

Fennel ships with data sinks to a couple of [common datastores](/api-reference/sink_connectors)
so that you can 'sink' from your Fennel datasets to your external datasets.
Sinks to many other common data stores will be added soon.
Sinks to many other common data stores will be added soon.

## Stacking Multiple Sinks
Stacking two (or more) sinks on top of a dataset simply leads to the dataset
getting sinked to all of them.

<pre snippet="concepts/sink#stacked" status="success"
message="Writing data to Snowflake and Kafka"
></pre>
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [1.5.59] - 2024-12-06
- Add support for stacked sinks

## [1.5.58] - 2024-11-24
- Allow min/max aggregation on date, datetime and decimal dtypes

Expand Down
3 changes: 3 additions & 0 deletions fennel/connectors/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ def sink(
since: Optional[datetime] = None,
until: Optional[datetime] = None,
env: Optional[Union[str, List[str]]] = None,
stacked: Optional[bool] = None,
) -> Callable[[T], Any]:
if not isinstance(conn, DataConnector):
if not isinstance(conn, DataSource):
Expand Down Expand Up @@ -352,6 +353,7 @@ def decorator(dataset_cls: T):
conn.renames = renames
conn.since = since
conn.until = until
conn.stacked = stacked
conn.envs = EnvSelector(env)
connectors = getattr(dataset_cls, SINK_FIELD, [])
connectors.append(conn)
Expand Down Expand Up @@ -821,6 +823,7 @@ class DataConnector:
how: Optional[Literal["incremental", "recreate"] | SnapshotData] = None
create: Optional[bool] = None
renames: Optional[Dict[str, str]] = {}
stacked: Optional[bool] = None

def identifier(self):
raise NotImplementedError
Expand Down
51 changes: 49 additions & 2 deletions fennel/connectors/test_connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,14 @@ class UserInfoDataset:
cdc="debezium",
env=["prod_new4"],
)
@sink(
http_with_secret.path(
endpoint="/sink3", limit=100, headers={"Foo": "Bar"}
),
cdc="debezium",
env=["prod_new4"],
stacked=True,
)
@dataset
class UserInfoDatasetDerived:
user_id: int = field(key=True)
Expand Down Expand Up @@ -934,10 +942,10 @@ def create_user_transactions(cls, dataset: Dataset):
sync_request = view._get_sync_request_proto(env="prod_new4")
assert len(sync_request.datasets) == 2
assert len(sync_request.sources) == 1
assert len(sync_request.sinks) == 1
assert len(sync_request.sinks) == 2
assert len(sync_request.extdbs) == 2

sink_request = sync_request.sinks[0]
sink_request = sync_request.sinks[1]
s = {
"table": {
"http_path": {
Expand Down Expand Up @@ -975,6 +983,45 @@ def create_user_transactions(cls, dataset: Dataset):
sink_request, expected_sink_request
)

sink_request = sync_request.sinks[0]
s = {
"table": {
"http_path": {
"db": {
"name": "http_sink_with_secret",
"http": {
"host_secret": {
"secret_arn": "arn:aws:secretsmanager:us-west-2:123456789012:secret:fennel-test-secret-1",
"role_arn": "arn:aws:iam::123456789012:role/fennel-test-role",
"path": ["http_host"],
},
"healthz": "/health",
"ca_cert": {
"secret_ref": {
"secret_arn": "arn:aws:secretsmanager:us-west-2:123456789012:secret:fennel-test-secret-1",
"role_arn": "arn:aws:iam::123456789012:role/fennel-test-role",
"path": ["ca_cert"],
},
},
},
},
"endpoint": "/sink3",
"limit": 100,
"headers": {
"Foo": "Bar",
},
},
},
"dataset": "UserInfoDatasetDerived",
"dsVersion": 1,
"cdc": "Debezium",
"stacked": True,
}
expected_sink_request = ParseDict(s, connector_proto.Sink())
assert sink_request == expected_sink_request, error_message(
sink_request, expected_sink_request
)


def test_kafka_sink_and_source_doesnt_create_extra_extdbs():
@meta(owner="[email protected]")
Expand Down
Loading

0 comments on commit 725eb40

Please sign in to comment.