Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for stacked sinks #611

Merged
merged 3 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions docs/examples/concepts/sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import os
import sys
from datetime import datetime
from fennel.expr import col
from fennel.testing import mock

__owner__ = "[email protected]"

os.environ["KAFKA_HOST"] = "some-host"
os.environ["KAFKA_USERNAME"] = "some-username"
os.environ["KAFKA_PASSWORD"] = "some-password"
os.environ["DB_NAME"] = "some-db"
os.environ["SNOWFLAKE_USERNAME"] = "snowflake-username"
os.environ["SNOWFLAKE_PASSWORD"] = "snowflake-password"


@mock
def test_stacked_sinks(client):
from fennel.connectors import Kafka, S3, eval, source
from fennel.datasets import dataset

kafka = Kafka(
name="my_kafka",
bootstrap_servers="localhost:9092",
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="PLAIN",
sasl_plain_username="user",
sasl_plain_password="password",
)
s3 = S3(name="mys3")
cutoff = datetime(2024, 1, 1, 0, 0, 0)
bucket = s3.bucket("data", path="orders")

# docsnip-highlight start
@source(bucket, disorder="1w", cdc="append", until=cutoff)
@source(kafka.topic("order"), disorder="1d", cdc="append", since=cutoff)
# docsnip-highlight end
@dataset
class UserLocation:
uid: int
city: str
country: str
update_time: datetime

# docsnip stacked
from fennel.connectors import Kafka, Snowflake, sink
from fennel.datasets import dataset, pipeline, Dataset
from fennel.lib.params import inputs

kafka = Kafka(
name="kafka_src",
bootstrap_servers=os.environ["KAFKA_HOST"],
security_protocol="PLAINTEXT",
sasl_mechanism="PLAIN",
sasl_plain_username=os.environ["KAFKA_USERNAME"],
sasl_plain_password=os.environ["KAFKA_PASSWORD"],
)
snowflake = Snowflake(
name="my_snowflake",
account="VPECCVJ-MUB03765",
warehouse="TEST",
db_name=os.environ["DB_NAME"],
schema="PUBLIC",
role="ACCOUNTADMIN",
username=os.environ["SNOWFLAKE_USERNAME"],
password=os.environ["SNOWFLAKE_PASSWORD"],
)

# docsnip-highlight start
@sink(
snowflake.table("test_table"),
every="1d",
how="incremental",
renames={"uid": "new_uid"},
stacked=True,
)
@sink(kafka.topic("user_location"), cdc="debezium")
# docsnip-highlight end
@dataset
class UserLocationFiltered:
uid: int
city: str
country: str
update_time: datetime

@pipeline
@inputs(UserLocation)
def user_location_count(cls, dataset: Dataset):
return dataset.filter(lambda row: row["country"] != "United States")

# /docsnip
client.commit(
message="some commit msg", datasets=[UserLocationFiltered, UserLocation]
)
6 changes: 6 additions & 0 deletions docs/pages/api-reference/decorators/sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ This means that the column denoted by the key is aliased to another column in th
This is useful, for instance, when you want to rename columns while sinking them.
</Expandable>

<Expandable title="stacked" type='Optional[bool]' defaultVal="None">
This indicates whether the sink is stacked on top of an existing sink.
If multiple sinks are added to the dataset, set this field to True
for all sinks except the first one.
</Expandable>

<pre snippet="api-reference/sinks/sink#sink_decorator"
status="success" message="Specifying options in sink decorator"
>
Expand Down
10 changes: 9 additions & 1 deletion docs/pages/concepts/sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,12 @@ publishing changes to your Kafka.

Fennel ships with data sinks to a couple of [common datastores](/api-reference/sink_connectors)
so that you can 'sink' from your Fennel datasets to your external datasets.
Sinks to many other common data stores will be added soon.
Sinks to many other common data stores will be added soon.

## Stacking Multiple Sinks
Stacking two (or more) sinks on top of a dataset simply leads to the dataset
getting sinked to all of them.

<pre snippet="concepts/sink#stacked" status="success"
message="Writing data to Snowflake and Kafka"
></pre>
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [1.5.59] - 2024-12-06
- Add support for stacked sinks

## [1.5.58] - 2024-11-24
- Allow min/max aggregation on date, datetime and decimal dtypes

Expand Down
3 changes: 3 additions & 0 deletions fennel/connectors/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ def sink(
since: Optional[datetime] = None,
until: Optional[datetime] = None,
env: Optional[Union[str, List[str]]] = None,
stacked: Optional[bool] = None,
) -> Callable[[T], Any]:
if not isinstance(conn, DataConnector):
if not isinstance(conn, DataSource):
Expand Down Expand Up @@ -352,6 +353,7 @@ def decorator(dataset_cls: T):
conn.renames = renames
conn.since = since
conn.until = until
conn.stacked = stacked
conn.envs = EnvSelector(env)
connectors = getattr(dataset_cls, SINK_FIELD, [])
connectors.append(conn)
Expand Down Expand Up @@ -821,6 +823,7 @@ class DataConnector:
how: Optional[Literal["incremental", "recreate"] | SnapshotData] = None
create: Optional[bool] = None
renames: Optional[Dict[str, str]] = {}
stacked: Optional[bool] = None

def identifier(self):
raise NotImplementedError
Expand Down
51 changes: 49 additions & 2 deletions fennel/connectors/test_connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,14 @@ class UserInfoDataset:
cdc="debezium",
env=["prod_new4"],
)
@sink(
http_with_secret.path(
endpoint="/sink3", limit=100, headers={"Foo": "Bar"}
),
cdc="debezium",
env=["prod_new4"],
stacked=True,
)
@dataset
class UserInfoDatasetDerived:
user_id: int = field(key=True)
Expand Down Expand Up @@ -934,10 +942,10 @@ def create_user_transactions(cls, dataset: Dataset):
sync_request = view._get_sync_request_proto(env="prod_new4")
assert len(sync_request.datasets) == 2
assert len(sync_request.sources) == 1
assert len(sync_request.sinks) == 1
assert len(sync_request.sinks) == 2
assert len(sync_request.extdbs) == 2

sink_request = sync_request.sinks[0]
sink_request = sync_request.sinks[1]
s = {
"table": {
"http_path": {
Expand Down Expand Up @@ -975,6 +983,45 @@ def create_user_transactions(cls, dataset: Dataset):
sink_request, expected_sink_request
)

sink_request = sync_request.sinks[0]
s = {
"table": {
"http_path": {
"db": {
"name": "http_sink_with_secret",
"http": {
"host_secret": {
"secret_arn": "arn:aws:secretsmanager:us-west-2:123456789012:secret:fennel-test-secret-1",
"role_arn": "arn:aws:iam::123456789012:role/fennel-test-role",
"path": ["http_host"],
},
"healthz": "/health",
"ca_cert": {
"secret_ref": {
"secret_arn": "arn:aws:secretsmanager:us-west-2:123456789012:secret:fennel-test-secret-1",
"role_arn": "arn:aws:iam::123456789012:role/fennel-test-role",
"path": ["ca_cert"],
},
},
},
},
"endpoint": "/sink3",
"limit": 100,
"headers": {
"Foo": "Bar",
},
},
},
"dataset": "UserInfoDatasetDerived",
"dsVersion": 1,
"cdc": "Debezium",
"stacked": True,
}
expected_sink_request = ParseDict(s, connector_proto.Sink())
assert sink_request == expected_sink_request, error_message(
sink_request, expected_sink_request
)


def test_kafka_sink_and_source_doesnt_create_extra_extdbs():
@meta(owner="[email protected]")
Expand Down
Loading
Loading