Skip to content

Commit

Permalink
Add support for Keyed Snowflake Sink (#582)
Browse files Browse the repository at this point in the history
  • Loading branch information
saiharshavellanki authored Oct 14, 2024
1 parent e115fc0 commit b9fa39b
Show file tree
Hide file tree
Showing 35 changed files with 555 additions and 66 deletions.
1 change: 1 addition & 0 deletions .wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ schemas
sdk
signup
signups
sinked
snapshotted
snowflakecomputing
src
Expand Down
41 changes: 24 additions & 17 deletions docs/api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,31 @@ sidebar:
- "api-reference/operators/transform"
- "api-reference/operators/union"

- slug: "api-reference/connectors"
title: "Connectors"
- slug: "api-reference/source_connectors"
title: "Source Connectors"
pages:
- "api-reference/connectors/avro"
- "api-reference/connectors/protobuf"
- "api-reference/connectors/bigquery"
- "api-reference/connectors/deltalake"
- "api-reference/connectors/hudi"
- "api-reference/connectors/kafka"
- "api-reference/connectors/kinesis"
- "api-reference/connectors/mongo"
- "api-reference/connectors/mysql"
- "api-reference/connectors/postgres"
- "api-reference/connectors/pubsub"
- "api-reference/connectors/redshift"
- "api-reference/connectors/s3"
- "api-reference/connectors/snowflake"
- "api-reference/connectors/webhook"
- "api-reference/source_connectors/avro"
- "api-reference/source_connectors/protobuf"
- "api-reference/source_connectors/bigquery"
- "api-reference/source_connectors/deltalake"
- "api-reference/source_connectors/hudi"
- "api-reference/source_connectors/kafka"
- "api-reference/source_connectors/kinesis"
- "api-reference/source_connectors/mongo"
- "api-reference/source_connectors/mysql"
- "api-reference/source_connectors/postgres"
- "api-reference/source_connectors/pubsub"
- "api-reference/source_connectors/redshift"
- "api-reference/source_connectors/s3"
- "api-reference/source_connectors/snowflake"
- "api-reference/source_connectors/webhook"

- slug: "api-reference/sink_connectors"
title: "Sink Connectors"
pages:
- "api-reference/sink_connectors/kafka"
- "api-reference/sink_connectors/s3"
- "api-reference/sink_connectors/snowflake"

- slug: "api-reference/aggregations"
title: "Aggregations"
Expand Down
80 changes: 80 additions & 0 deletions docs/examples/api-reference/sinks/snowflake_sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import os
from datetime import datetime

from fennel.testing import mock

__owner__ = "[email protected]"


@mock
def test_snowflake_sink(client):
os.environ["KAFKA_USERNAME"] = "test"
os.environ["KAFKA_PASSWORD"] = "test"
os.environ["SNOWFLAKE_USERNAME"] = "some-name"
os.environ["SNOWFLAKE_PASSWORD"] = "some-password"
os.environ["DB_NAME"] = "some-db-name"

from fennel.connectors import source, Kafka, Snowflake
from fennel.datasets import dataset, field

kafka = Kafka(
name="my_kafka",
bootstrap_servers="localhost:9092", # could come via os env var too
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="PLAIN",
sasl_plain_username=os.environ["KAFKA_USERNAME"],
sasl_plain_password=os.environ["KAFKA_PASSWORD"],
)

# docsnip-highlight next-line
@source(kafka.topic("user", format="json"), disorder="14d", cdc="upsert")
@dataset
class SomeDataset:
uid: int = field(key=True)
email: str
timestamp: datetime

from fennel.connectors import source, Kafka
from fennel.datasets import dataset, field, pipeline, Dataset
from fennel.lib.params import inputs

# docsnip-highlight start
snowflake = Snowflake(
name="my_snowflake",
account="VPECCVJ-MUB03765",
warehouse="TEST",
db_name=os.environ["DB_NAME"],
schema="PUBLIC",
role="ACCOUNTADMIN",
username=os.environ["SNOWFLAKE_USERNAME"],
password=os.environ["SNOWFLAKE_PASSWORD"],
)
# docsnip-highlight end

# docsnip basic
from fennel.connectors import sink

@dataset
@sink(
snowflake.table("test_table"),
every="1d",
how="incremental",
renames={"uid": "new_uid"},
) # docsnip-highlight
class SomeDatasetFiltered:
uid: int = field(key=True)
email: str
timestamp: datetime

@pipeline
@inputs(SomeDataset)
def gmail_filtered(cls, dataset: Dataset):
return dataset.filter(
lambda row: row["email"].contains("gmail.com")
)

# /docsnip

client.commit(
message="some commit msg", datasets=[SomeDataset, SomeDatasetFiltered]
)
2 changes: 1 addition & 1 deletion docs/pages/api-reference/client/log.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ status: published
---
### Log

Method to push data into Fennel datasets via [webhook endpoints](/api-reference/connectors/webhook).
Method to push data into Fennel datasets via [webhook endpoints](/api-reference/source_connectors/webhook).

#### Parameters
<Expandable title="webhook" type="str">
Expand Down
4 changes: 2 additions & 2 deletions docs/pages/api-reference/client/query-offline.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ This parameter is mutually exclusive with `input_s3`.
<Expandable title="input_s3" type="Optional[connectors.S3]">
Sending large volumes of the input data over the wire is often infeasible.
In such cases, input data can be written to S3 and the location of the file is
sent as `input_s3` via `S3.bucket()` function of [S3](/api-reference/connectors/s3)
sent as `input_s3` via `S3.bucket()` function of [S3](/api-reference/source_connectors/s3)
connector.

When using this option, please ensure that Fennel's data connector
Expand All @@ -49,7 +49,7 @@ must be computed.
<Expandable title="output_s3" type="Optional[connectors.S3]">
Specifies the location & other details about the s3 path where the values of
all the output features should be written. Similar to `input_s3`, this is
provided via `S3.bucket()` function of [S3](/api-reference/connectors/s3) connector.
provided via `S3.bucket()` function of [S3](/api-reference/source_connectors/s3) connector.

If this isn't provided, Fennel writes the results of all requests to a fixed
default bucket - you can see its details from the return value of `query_offline`
Expand Down
8 changes: 4 additions & 4 deletions docs/pages/api-reference/decorators/source.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ over `"upsert"`.

`"native"` means that the underlying system exposes CDC natively and that Fennel
should tap into that. As of right now, native CDC is only available for
[Deltalake](/api-reference/connectors/deltalake) & [Hudi](/api-reference/connectors/hudi)
[Deltalake](/api-reference/source_connectors/deltalake) & [Hudi](/api-reference/source_connectors/hudi)
and will soon be available for more sources including MySQL and Postgres.

`"debezium"` means that the raw data itself is laid out in debezium layout out
of which valid CDC data can be constructed. This is only possible for sources
that expose raw schemaless data, namely, [s3](/api-reference/connectors/s3),
[kinesis](/api-reference/connectors/kinesis), [kafka](/api-reference/connectors/kafka),
and [webhook](/api-reference/connectors/webhook).
that expose raw schemaless data, namely, [s3](/api-reference/source_connectors/s3),
[kinesis](/api-reference/source_connectors/kinesis), [kafka](/api-reference/source_connectors/kafka),
and [webhook](/api-reference/source_connectors/webhook).
</Expandable>

<Expandable title="env" type="None | str | List[str]" defaultVal="None">
Expand Down
2 changes: 1 addition & 1 deletion docs/pages/api-reference/rest-api/log.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ status: published

`POST /api/v1/log`
### Log
Method to push data into Fennel datasets via [webhook endpoints](/api-reference/connectors/webhook)
Method to push data into Fennel datasets via [webhook endpoints](/api-reference/source_connectors/webhook)
via REST API.


Expand Down
66 changes: 66 additions & 0 deletions docs/pages/api-reference/sink_connectors/kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
---
title: Kafka
order: 0
status: published
---
### Kafka
Data sink to any data store that speaks the Kafka protocol (e.g. Native
Kafka, MSK, Redpanda etc.)

#### Cluster Parameters
<Expandable title="name" type="str">
A name to identify the sink. This name should be unique across ALL connectors.
</Expandable>

<Expandable title="bootstrap_servers" type="str">
This is a list of the addresses of the Kafka brokers in a "bootstrap" Kafka
cluster that a Kafka client connects to initially to bootstrap itself and discover
the rest of the brokers in the cluster.

Addresses are written as host & port pairs and can be specified either as a
single server (e.g. `localhost:9092`) or a comma separated list of several
servers (e.g. `localhost:9092,another.host:9092`).
</Expandable>

<Expandable title="security_protocol" type='"PLAINTEXT" | "SASL_PLAINTEXT" | "SASL_SSL"'>
Protocol used to communicate with the brokers.
</Expandable>

<Expandable title="sasl_mechanism" type='"PLAIN" | "SCRAM-SHA-256" | "SCRAM-SHA-512" | "GSSAPI"'>
SASL mechanism to use for authentication.
</Expandable>

<Expandable title="sasl_plain_username" type="Optional[str]">
SASL username.
</Expandable>

<Expandable title="sasl_plain_password" type="Optional[str]">
SASL password.
</Expandable>

#### Topic Parameters

<Expandable title="topic" type="str">
The name of the kafka topic that needs to be sinked.
</Expandable>

<pre snippet="api-reference/sinks/kafka_sink#basic"
status="success" message="Capturing change from a dataset to a Kafka sink"
></pre>

#### Errors
<Expandable title="Connectivity problems">
Fennel server tries to connect with the Kafka broker during the `commit` operation
itself to validate connectivity - as a result, incorrect URL/Username/Password
etc will be caught at commit time itself as an error.

Note: Mock client can not talk to any external data sink and hence is unable to
do this validation at commit time.
</Expandable>

:::info
- Fennel supports kafka sink with only the JSON debezium format. Given the ubiquity
of debezium connectors, you should be able to further pipe this debezium data
from Kafka to your data store of choice. In case you require support for
other formats, please reach out to Fennel support.
:::
89 changes: 89 additions & 0 deletions docs/pages/api-reference/sink_connectors/s3.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
---
title: S3
order: 0
status: published
---
### S3
Data connector to sink data to S3.

#### Account Parameters
<Expandable title="name" type="str">
A name to identify the sink. The name should be unique across all Fennel connectors.
</Expandable>

<Expandable title="aws_access_key_id" type="Optional[str]" defaultVal="None">
AWS Access Key ID. This field is not required if role-based access is used or if
the bucket is public.
</Expandable>

<Expandable title="aws_secrete_access_key" type="Optional[str]" defaultVal="None">
AWS Secret Access Key. This field is not required if role-based access is used
or if the bucket is public.
</Expandable>

<Expandable title="role_arn" type="Optional[str]" defaultVal="None">
Role ARN to assume to get access to S3. This field is not required if role-based access is used
or if AWS access and secret keys are used or if the bucket is public.
</Expandable>


#### Bucket Parameters
<Expandable title="bucket" type="str">
The name of the S3 bucket where the data files have to be sinked.
</Expandable>

<Expandable title="prefix" type="Optional[str]" defaultVal="None">
The prefix of the bucket (as relative path within bucket) where the data files
should be sinked. For instance, `some-folder/` or `A/B/C` are all valid prefixes. Prefix
can not have any wildcard characters.
</Expandable>

<Expandable title="format" type="str" defaultVal="csv">
The format of the files you'd like to sink. Valid values are "csv", "parquet",
"json", ["delta"](/api-reference/source_connectors/deltalake) or ["hudi"](/api-reference/source_connectors/hudi).
</Expandable>

<Expandable title="delimiter" type="Optional[str]" defaultVal=",">
The character delimiting individual cells in the CSV data - only relevant when
format is `CSV`, otherwise it's ignored.

The default value is `","` can be overridden by any other 1-character string. For
example, to use tab-delimited data enter `"\t"`.
</Expandable>

<pre snippet="api-reference/sinks/s3_sink#basic"
status="success" message="S3 sink">
</pre>

#### Errors
<Expandable title="Connectivity or authentication errors">
Fennel server try to do some lightweight operations on the bucket during the commit
operation - all connectivity or authentication related errors should be caught
during the commit itself.

Note: Mock client can not talk to any external data sink and hence is unable to
do this validation at commit time.
</Expandable>

#### Enabling IAM Access
Fennel creates a role with name prefixed by `FennelDataAccessRole-` in
your AWS account for role-based access. In order to use IAM access for S3, please
ensure that this role has access to read and do list files on the buckets of
interest.

With that ready, simply don't specify `aws_access_key_id` and
`aws_secret_access_key` and Fennel will automatically fall back to IAM based
access.

In case you do not want to provide S3 access to `FennelDataAccessRole-`, pass `role_arn`
parameter inside connector params and make sure `FennelDataAccessRole-` can assume that IAM role

:::info
- Fennel appends a generated suffix to the above provided prefix to avoid ambiguities when the
sink dataset version is updated or when multiple branches have the same sink defined. This suffix
can be found in the 'Sink' tab of the console after initiating a data sink.
- Fennel supports S3 sink with only delta format and access to S3 through `FennelDataAccessRole-`.
In case you require support for other formats or access mechanisms, please reach out to Fennel support
- Currently, Fennel only supports keyed dataset sinks to S3. Support for keyless datasets will be added soon.
:::

Loading

0 comments on commit b9fa39b

Please sign in to comment.