From b9fa39ba9a9c1af58a5268138103d1075490d638 Mon Sep 17 00:00:00 2001 From: Vellanki Sai Harsha Date: Mon, 14 Oct 2024 10:39:15 +0530 Subject: [PATCH] Add support for Keyed Snowflake Sink (#582) --- .wordlist.txt | 1 + docs/api.yml | 41 +++++---- .../api-reference/sinks/snowflake_sink.py | 80 +++++++++++++++++ docs/pages/api-reference/client/log.md | 2 +- .../api-reference/client/query-offline.md | 4 +- docs/pages/api-reference/decorators/source.md | 8 +- docs/pages/api-reference/rest-api/log.md | 2 +- .../api-reference/sink_connectors/kafka.md | 66 ++++++++++++++ .../pages/api-reference/sink_connectors/s3.md | 89 +++++++++++++++++++ .../sink_connectors/snowflake.md | 81 +++++++++++++++++ .../{connectors => source_connectors}/avro.md | 0 .../bigquery.md | 2 +- .../deltalake.md | 0 .../{connectors => source_connectors}/hudi.md | 0 .../kafka.md | 8 +- .../kinesis.md | 4 +- .../mongo.md | 2 +- .../mysql.md | 2 +- .../postgres.md | 2 +- .../protobuf.md | 0 .../pubsub.md | 2 +- .../redshift.md | 2 +- .../{connectors => source_connectors}/s3.md | 7 +- .../snowflake.md | 2 +- .../webhook.md | 0 docs/pages/concepts/introduction.md | 2 +- docs/pages/concepts/sink.md | 8 +- docs/pages/concepts/source.md | 2 +- docs/pages/development/unit-testing.md | 2 +- fennel/CHANGELOG.md | 3 + fennel/connectors/connectors.py | 38 ++++++-- fennel/connectors/test_connectors.py | 49 ++++++++++ fennel/connectors/test_invalid_connectors.py | 40 ++++++++- fennel/internal_lib/to_proto/to_proto.py | 68 ++++++++++++++ pyproject.toml | 2 +- 35 files changed, 555 insertions(+), 66 deletions(-) create mode 100644 docs/examples/api-reference/sinks/snowflake_sink.py create mode 100644 docs/pages/api-reference/sink_connectors/kafka.md create mode 100644 docs/pages/api-reference/sink_connectors/s3.md create mode 100644 docs/pages/api-reference/sink_connectors/snowflake.md rename docs/pages/api-reference/{connectors => source_connectors}/avro.md (100%) rename docs/pages/api-reference/{connectors => source_connectors}/bigquery.md (99%) rename docs/pages/api-reference/{connectors => source_connectors}/deltalake.md (100%) rename docs/pages/api-reference/{connectors => source_connectors}/hudi.md (100%) rename docs/pages/api-reference/{connectors => source_connectors}/kafka.md (90%) rename docs/pages/api-reference/{connectors => source_connectors}/kinesis.md (97%) rename docs/pages/api-reference/{connectors => source_connectors}/mongo.md (99%) rename docs/pages/api-reference/{connectors => source_connectors}/mysql.md (99%) rename docs/pages/api-reference/{connectors => source_connectors}/postgres.md (99%) rename docs/pages/api-reference/{connectors => source_connectors}/protobuf.md (100%) rename docs/pages/api-reference/{connectors => source_connectors}/pubsub.md (99%) rename docs/pages/api-reference/{connectors => source_connectors}/redshift.md (99%) rename docs/pages/api-reference/{connectors => source_connectors}/s3.md (96%) rename docs/pages/api-reference/{connectors => source_connectors}/snowflake.md (99%) rename docs/pages/api-reference/{connectors => source_connectors}/webhook.md (100%) diff --git a/.wordlist.txt b/.wordlist.txt index 1f61ba1cb..07847deae 100644 --- a/.wordlist.txt +++ b/.wordlist.txt @@ -340,6 +340,7 @@ schemas sdk signup signups +sinked snapshotted snowflakecomputing src diff --git a/docs/api.yml b/docs/api.yml index b64917e08..989a49c64 100644 --- a/docs/api.yml +++ b/docs/api.yml @@ -50,24 +50,31 @@ sidebar: - "api-reference/operators/transform" - "api-reference/operators/union" - - slug: "api-reference/connectors" - title: "Connectors" + - slug: "api-reference/source_connectors" + title: "Source Connectors" pages: - - "api-reference/connectors/avro" - - "api-reference/connectors/protobuf" - - "api-reference/connectors/bigquery" - - "api-reference/connectors/deltalake" - - "api-reference/connectors/hudi" - - "api-reference/connectors/kafka" - - "api-reference/connectors/kinesis" - - "api-reference/connectors/mongo" - - "api-reference/connectors/mysql" - - "api-reference/connectors/postgres" - - "api-reference/connectors/pubsub" - - "api-reference/connectors/redshift" - - "api-reference/connectors/s3" - - "api-reference/connectors/snowflake" - - "api-reference/connectors/webhook" + - "api-reference/source_connectors/avro" + - "api-reference/source_connectors/protobuf" + - "api-reference/source_connectors/bigquery" + - "api-reference/source_connectors/deltalake" + - "api-reference/source_connectors/hudi" + - "api-reference/source_connectors/kafka" + - "api-reference/source_connectors/kinesis" + - "api-reference/source_connectors/mongo" + - "api-reference/source_connectors/mysql" + - "api-reference/source_connectors/postgres" + - "api-reference/source_connectors/pubsub" + - "api-reference/source_connectors/redshift" + - "api-reference/source_connectors/s3" + - "api-reference/source_connectors/snowflake" + - "api-reference/source_connectors/webhook" + + - slug: "api-reference/sink_connectors" + title: "Sink Connectors" + pages: + - "api-reference/sink_connectors/kafka" + - "api-reference/sink_connectors/s3" + - "api-reference/sink_connectors/snowflake" - slug: "api-reference/aggregations" title: "Aggregations" diff --git a/docs/examples/api-reference/sinks/snowflake_sink.py b/docs/examples/api-reference/sinks/snowflake_sink.py new file mode 100644 index 000000000..72b5cfa1a --- /dev/null +++ b/docs/examples/api-reference/sinks/snowflake_sink.py @@ -0,0 +1,80 @@ +import os +from datetime import datetime + +from fennel.testing import mock + +__owner__ = "saiharsha@fennel.ai" + + +@mock +def test_snowflake_sink(client): + os.environ["KAFKA_USERNAME"] = "test" + os.environ["KAFKA_PASSWORD"] = "test" + os.environ["SNOWFLAKE_USERNAME"] = "some-name" + os.environ["SNOWFLAKE_PASSWORD"] = "some-password" + os.environ["DB_NAME"] = "some-db-name" + + from fennel.connectors import source, Kafka, Snowflake + from fennel.datasets import dataset, field + + kafka = Kafka( + name="my_kafka", + bootstrap_servers="localhost:9092", # could come via os env var too + security_protocol="SASL_PLAINTEXT", + sasl_mechanism="PLAIN", + sasl_plain_username=os.environ["KAFKA_USERNAME"], + sasl_plain_password=os.environ["KAFKA_PASSWORD"], + ) + + # docsnip-highlight next-line + @source(kafka.topic("user", format="json"), disorder="14d", cdc="upsert") + @dataset + class SomeDataset: + uid: int = field(key=True) + email: str + timestamp: datetime + + from fennel.connectors import source, Kafka + from fennel.datasets import dataset, field, pipeline, Dataset + from fennel.lib.params import inputs + + # docsnip-highlight start + snowflake = Snowflake( + name="my_snowflake", + account="VPECCVJ-MUB03765", + warehouse="TEST", + db_name=os.environ["DB_NAME"], + schema="PUBLIC", + role="ACCOUNTADMIN", + username=os.environ["SNOWFLAKE_USERNAME"], + password=os.environ["SNOWFLAKE_PASSWORD"], + ) + # docsnip-highlight end + + # docsnip basic + from fennel.connectors import sink + + @dataset + @sink( + snowflake.table("test_table"), + every="1d", + how="incremental", + renames={"uid": "new_uid"}, + ) # docsnip-highlight + class SomeDatasetFiltered: + uid: int = field(key=True) + email: str + timestamp: datetime + + @pipeline + @inputs(SomeDataset) + def gmail_filtered(cls, dataset: Dataset): + return dataset.filter( + lambda row: row["email"].contains("gmail.com") + ) + + # /docsnip + + client.commit( + message="some commit msg", datasets=[SomeDataset, SomeDatasetFiltered] + ) diff --git a/docs/pages/api-reference/client/log.md b/docs/pages/api-reference/client/log.md index 980f67903..117449548 100644 --- a/docs/pages/api-reference/client/log.md +++ b/docs/pages/api-reference/client/log.md @@ -5,7 +5,7 @@ status: published --- ### Log -Method to push data into Fennel datasets via [webhook endpoints](/api-reference/connectors/webhook). +Method to push data into Fennel datasets via [webhook endpoints](/api-reference/source_connectors/webhook). #### Parameters diff --git a/docs/pages/api-reference/client/query-offline.md b/docs/pages/api-reference/client/query-offline.md index 60129f4a2..68cc7268e 100644 --- a/docs/pages/api-reference/client/query-offline.md +++ b/docs/pages/api-reference/client/query-offline.md @@ -32,7 +32,7 @@ This parameter is mutually exclusive with `input_s3`. Sending large volumes of the input data over the wire is often infeasible. In such cases, input data can be written to S3 and the location of the file is -sent as `input_s3` via `S3.bucket()` function of [S3](/api-reference/connectors/s3) +sent as `input_s3` via `S3.bucket()` function of [S3](/api-reference/source_connectors/s3) connector. When using this option, please ensure that Fennel's data connector @@ -49,7 +49,7 @@ must be computed. Specifies the location & other details about the s3 path where the values of all the output features should be written. Similar to `input_s3`, this is -provided via `S3.bucket()` function of [S3](/api-reference/connectors/s3) connector. +provided via `S3.bucket()` function of [S3](/api-reference/source_connectors/s3) connector. If this isn't provided, Fennel writes the results of all requests to a fixed default bucket - you can see its details from the return value of `query_offline` diff --git a/docs/pages/api-reference/decorators/source.md b/docs/pages/api-reference/decorators/source.md index 1978cdc78..3953f61d7 100644 --- a/docs/pages/api-reference/decorators/source.md +++ b/docs/pages/api-reference/decorators/source.md @@ -67,14 +67,14 @@ over `"upsert"`. `"native"` means that the underlying system exposes CDC natively and that Fennel should tap into that. As of right now, native CDC is only available for -[Deltalake](/api-reference/connectors/deltalake) & [Hudi](/api-reference/connectors/hudi) +[Deltalake](/api-reference/source_connectors/deltalake) & [Hudi](/api-reference/source_connectors/hudi) and will soon be available for more sources including MySQL and Postgres. `"debezium"` means that the raw data itself is laid out in debezium layout out of which valid CDC data can be constructed. This is only possible for sources -that expose raw schemaless data, namely, [s3](/api-reference/connectors/s3), -[kinesis](/api-reference/connectors/kinesis), [kafka](/api-reference/connectors/kafka), -and [webhook](/api-reference/connectors/webhook). +that expose raw schemaless data, namely, [s3](/api-reference/source_connectors/s3), +[kinesis](/api-reference/source_connectors/kinesis), [kafka](/api-reference/source_connectors/kafka), +and [webhook](/api-reference/source_connectors/webhook). diff --git a/docs/pages/api-reference/rest-api/log.md b/docs/pages/api-reference/rest-api/log.md index 05fa08e92..3ef3e57f7 100644 --- a/docs/pages/api-reference/rest-api/log.md +++ b/docs/pages/api-reference/rest-api/log.md @@ -6,7 +6,7 @@ status: published `POST /api/v1/log` ### Log -Method to push data into Fennel datasets via [webhook endpoints](/api-reference/connectors/webhook) +Method to push data into Fennel datasets via [webhook endpoints](/api-reference/source_connectors/webhook) via REST API. diff --git a/docs/pages/api-reference/sink_connectors/kafka.md b/docs/pages/api-reference/sink_connectors/kafka.md new file mode 100644 index 000000000..c5155131c --- /dev/null +++ b/docs/pages/api-reference/sink_connectors/kafka.md @@ -0,0 +1,66 @@ +--- +title: Kafka +order: 0 +status: published +--- +### Kafka +Data sink to any data store that speaks the Kafka protocol (e.g. Native +Kafka, MSK, Redpanda etc.) + +#### Cluster Parameters + +A name to identify the sink. This name should be unique across ALL connectors. + + + +This is a list of the addresses of the Kafka brokers in a "bootstrap" Kafka +cluster that a Kafka client connects to initially to bootstrap itself and discover +the rest of the brokers in the cluster. + +Addresses are written as host & port pairs and can be specified either as a +single server (e.g. `localhost:9092`) or a comma separated list of several +servers (e.g. `localhost:9092,another.host:9092`). + + + +Protocol used to communicate with the brokers. + + + +SASL mechanism to use for authentication. + + + +SASL username. + + + +SASL password. + + +#### Topic Parameters + + +The name of the kafka topic that needs to be sinked. + + +

+
+#### Errors
+
+Fennel server tries to connect with the Kafka broker during the `commit` operation
+itself to validate connectivity - as a result, incorrect URL/Username/Password
+etc will be caught at commit time itself as an error.
+
+Note: Mock client can not talk to any external data sink and hence is unable to
+do this validation at commit time.
+
+
+:::info
+- Fennel supports kafka sink with only the JSON debezium format. Given the ubiquity 
+of debezium connectors, you should be able to further pipe this debezium data
+from Kafka to your data store of choice. In case you require support for
+other formats, please reach out to Fennel support.
+:::
diff --git a/docs/pages/api-reference/sink_connectors/s3.md b/docs/pages/api-reference/sink_connectors/s3.md
new file mode 100644
index 000000000..9370d6f34
--- /dev/null
+++ b/docs/pages/api-reference/sink_connectors/s3.md
@@ -0,0 +1,89 @@
+---
+title: S3
+order: 0
+status: published
+---
+### S3
+Data connector to sink data to S3.
+
+#### Account Parameters
+
+A name to identify the sink. The name should be unique across all Fennel connectors.
+
+
+
+AWS Access Key ID. This field is not required if role-based access is used or if
+the bucket is public.
+
+
+
+AWS Secret Access Key. This field is not required if role-based access is used 
+or if the bucket is public.
+
+
+
+Role ARN to assume to get access to S3. This field is not required if role-based access is used 
+or if AWS access and secret keys are used or if the bucket is public.
+
+
+
+#### Bucket Parameters
+
+The name of the S3 bucket where the data files have to be sinked.
+
+
+
+The prefix of the bucket (as relative path within bucket) where the data files
+should be sinked. For instance, `some-folder/` or `A/B/C` are all valid prefixes. Prefix
+can not have any wildcard characters.
+
+
+
+The format of the files you'd like to sink. Valid values are "csv", "parquet", 
+"json", ["delta"](/api-reference/source_connectors/deltalake) or ["hudi"](/api-reference/source_connectors/hudi).
+
+
+
+The character delimiting individual cells in the CSV data - only relevant when
+format is `CSV`, otherwise it's ignored.
+
+The default value is `","` can be overridden by any other 1-character string. For 
+example, to use tab-delimited data enter `"\t"`.  
+
+
+
+
+ +#### Errors + +Fennel server try to do some lightweight operations on the bucket during the commit +operation - all connectivity or authentication related errors should be caught +during the commit itself. + +Note: Mock client can not talk to any external data sink and hence is unable to +do this validation at commit time. + + +#### Enabling IAM Access +Fennel creates a role with name prefixed by `FennelDataAccessRole-` in +your AWS account for role-based access. In order to use IAM access for S3, please +ensure that this role has access to read and do list files on the buckets of +interest. + +With that ready, simply don't specify `aws_access_key_id` and +`aws_secret_access_key` and Fennel will automatically fall back to IAM based +access. + +In case you do not want to provide S3 access to `FennelDataAccessRole-`, pass `role_arn` +parameter inside connector params and make sure `FennelDataAccessRole-` can assume that IAM role + +:::info +- Fennel appends a generated suffix to the above provided prefix to avoid ambiguities when the +sink dataset version is updated or when multiple branches have the same sink defined. This suffix +can be found in the 'Sink' tab of the console after initiating a data sink. +- Fennel supports S3 sink with only delta format and access to S3 through `FennelDataAccessRole-`. +In case you require support for other formats or access mechanisms, please reach out to Fennel support +- Currently, Fennel only supports keyed dataset sinks to S3. Support for keyless datasets will be added soon. +::: + diff --git a/docs/pages/api-reference/sink_connectors/snowflake.md b/docs/pages/api-reference/sink_connectors/snowflake.md new file mode 100644 index 000000000..d6a73ddc4 --- /dev/null +++ b/docs/pages/api-reference/sink_connectors/snowflake.md @@ -0,0 +1,81 @@ +--- +title: Snowflake +order: 0 +status: published +--- +### Snowflake +Data sink to Snowflake databases. + +#### Database Parameters + +A name to identify the sink. The name should be unique across all Fennel connectors. + + + + +Snowflake account identifier. This is the first part of the URL used to access +Snowflake. For example, if the URL is `https://.snowflakecomputing.com`, +then the account is ``. + +This is usually of the form `-`. Refer to the +[Snowflake documentation](https://docs.snowflake.com/en/user-guide/admin-account-identifier#finding-the-organization-and-account-name-for-an-account) +to find the account identifier. + + + +The role that should be used by Fennel to access Snowflake. + + + +The warehouse that should be used to access Snowflake. + + + +The name of the database where the data has to be sinked. + + + +The schema where the required data has to be sinked. + + + +The username which should be used to access Snowflake. This username should +have required permissions to assume the provided `role`. + + + +The password associated with the username. + + +#### Table Parameters + +The prefix of the table within the database to which the data should be sinked. + + +
+
+ +#### Errors + +Fennel tries to test the connection with your Snowflake during `commit` itself so any +connectivity issue (e.g. wrong host name, username, password etc) is flagged as +as an error during commit with the real Fennel servers. + +Note: Mock client can not talk to any external data sink and hence is unable to +do this validation at commit time. + + +:::info +- Fennel appends a generated suffix to the provided table name to prevent ambiguities +when the sink dataset version is updated or when multiple branches have the same +sink defined. This suffix can be viewed in the 'Sink' tab of the console after +initiating a data sink. +- Currently, Fennel only supports keyed dataset sinks to Snowflake. Support +for keyless datasets will be added soon. +::: + + + + + diff --git a/docs/pages/api-reference/connectors/avro.md b/docs/pages/api-reference/source_connectors/avro.md similarity index 100% rename from docs/pages/api-reference/connectors/avro.md rename to docs/pages/api-reference/source_connectors/avro.md diff --git a/docs/pages/api-reference/connectors/bigquery.md b/docs/pages/api-reference/source_connectors/bigquery.md similarity index 99% rename from docs/pages/api-reference/connectors/bigquery.md rename to docs/pages/api-reference/source_connectors/bigquery.md index 864897f8f..d40105618 100644 --- a/docs/pages/api-reference/connectors/bigquery.md +++ b/docs/pages/api-reference/source_connectors/bigquery.md @@ -8,7 +8,7 @@ Data connector to Google BigQuery databases. #### Database Parameters -A name to identify the source. The name should be unique across all Fennel sources. +A name to identify the source. The name should be unique across all Fennel connectors. diff --git a/docs/pages/api-reference/connectors/deltalake.md b/docs/pages/api-reference/source_connectors/deltalake.md similarity index 100% rename from docs/pages/api-reference/connectors/deltalake.md rename to docs/pages/api-reference/source_connectors/deltalake.md diff --git a/docs/pages/api-reference/connectors/hudi.md b/docs/pages/api-reference/source_connectors/hudi.md similarity index 100% rename from docs/pages/api-reference/connectors/hudi.md rename to docs/pages/api-reference/source_connectors/hudi.md diff --git a/docs/pages/api-reference/connectors/kafka.md b/docs/pages/api-reference/source_connectors/kafka.md similarity index 90% rename from docs/pages/api-reference/connectors/kafka.md rename to docs/pages/api-reference/source_connectors/kafka.md index ccddc47c2..ccb8a71e3 100644 --- a/docs/pages/api-reference/connectors/kafka.md +++ b/docs/pages/api-reference/source_connectors/kafka.md @@ -9,7 +9,7 @@ Kafka, MSK, Redpanda etc.) #### Cluster Parameters -A name to identify the source. This name should be unique across ALL sources. +A name to identify the source. This name should be unique across ALL connectors. @@ -46,17 +46,13 @@ The name of the kafka topic that needs to be sourced into the dataset. The format of the data in Kafka topic. `"json"`, -[Avro](/api-reference/connectors/avro) and [Protobuf](/api-reference/connectors/protobuf) supported. +[Avro](/api-reference/source_connectors/avro) and [Protobuf](/api-reference/source_connectors/protobuf) supported.

 
-

-
 :::info
 Fennel supports only Append and Upsert mode CDC with data in Protobuf format. If you require support
 for CDC data format, please reach out to Fennel support.
diff --git a/docs/pages/api-reference/connectors/kinesis.md b/docs/pages/api-reference/source_connectors/kinesis.md
similarity index 97%
rename from docs/pages/api-reference/connectors/kinesis.md
rename to docs/pages/api-reference/source_connectors/kinesis.md
index a0d83f126..37367f62a 100644
--- a/docs/pages/api-reference/connectors/kinesis.md
+++ b/docs/pages/api-reference/source_connectors/kinesis.md
@@ -9,7 +9,7 @@ Data connector to ingest data from AWS Kinesis.
 #### Parameters for Defining Source
 
 
-A name to identify the source. The name should be unique across all Fennel sources.
+A name to identify the source. The name should be unique across all Fennel connectors.
 
 
 
@@ -47,7 +47,7 @@ the time of production, not any timestamp field inside the message.
 
 
 The format of the data in the Kinesis stream. Most common value is `"json"` 
-though Fennel also supports [Avro](/api-reference/connectors/avro).
+though Fennel also supports [Avro](/api-reference/source_connectors/avro).
 
 
 #### Errors
diff --git a/docs/pages/api-reference/connectors/mongo.md b/docs/pages/api-reference/source_connectors/mongo.md
similarity index 99%
rename from docs/pages/api-reference/connectors/mongo.md
rename to docs/pages/api-reference/source_connectors/mongo.md
index d180f389a..862752654 100644
--- a/docs/pages/api-reference/connectors/mongo.md
+++ b/docs/pages/api-reference/source_connectors/mongo.md
@@ -8,7 +8,7 @@ Data connector to MongoDB databases.
 
 #### Database Parameters
 
-A name to identify the source. The name should be unique across all Fennel sources.
+A name to identify the source. The name should be unique across all Fennel connectors.
 
 
 
diff --git a/docs/pages/api-reference/connectors/mysql.md b/docs/pages/api-reference/source_connectors/mysql.md
similarity index 99%
rename from docs/pages/api-reference/connectors/mysql.md
rename to docs/pages/api-reference/source_connectors/mysql.md
index abd1d419a..d2877dcb2 100644
--- a/docs/pages/api-reference/connectors/mysql.md
+++ b/docs/pages/api-reference/source_connectors/mysql.md
@@ -8,7 +8,7 @@ Data connector to MySQL databases.
 
 #### Database Parameters
 
-A name to identify the source. The name should be unique across all Fennel sources.
+A name to identify the source. The name should be unique across all Fennel connectors.
 
 
 
diff --git a/docs/pages/api-reference/connectors/postgres.md b/docs/pages/api-reference/source_connectors/postgres.md
similarity index 99%
rename from docs/pages/api-reference/connectors/postgres.md
rename to docs/pages/api-reference/source_connectors/postgres.md
index fcd775b65..02af26329 100644
--- a/docs/pages/api-reference/connectors/postgres.md
+++ b/docs/pages/api-reference/source_connectors/postgres.md
@@ -8,7 +8,7 @@ Data connector to Postgres databases.
 
 #### Database Parameters
 
-A name to identify the source. The name should be unique across all Fennel sources.
+A name to identify the source. The name should be unique across all Fennel connectors.
 
 
 
diff --git a/docs/pages/api-reference/connectors/protobuf.md b/docs/pages/api-reference/source_connectors/protobuf.md
similarity index 100%
rename from docs/pages/api-reference/connectors/protobuf.md
rename to docs/pages/api-reference/source_connectors/protobuf.md
diff --git a/docs/pages/api-reference/connectors/pubsub.md b/docs/pages/api-reference/source_connectors/pubsub.md
similarity index 99%
rename from docs/pages/api-reference/connectors/pubsub.md
rename to docs/pages/api-reference/source_connectors/pubsub.md
index 5e03f0f00..9f8df6986 100644
--- a/docs/pages/api-reference/connectors/pubsub.md
+++ b/docs/pages/api-reference/source_connectors/pubsub.md
@@ -8,7 +8,7 @@ Data connector to Google Pub/Sub messaging service.
 
 #### Project Parameters
 
-A name to identify the source. The name should be unique across all Fennel sources.
+A name to identify the source. The name should be unique across all Fennel connectors.
 
 
 
diff --git a/docs/pages/api-reference/connectors/redshift.md b/docs/pages/api-reference/source_connectors/redshift.md
similarity index 99%
rename from docs/pages/api-reference/connectors/redshift.md
rename to docs/pages/api-reference/source_connectors/redshift.md
index 335b49f42..d9d80d16a 100644
--- a/docs/pages/api-reference/connectors/redshift.md
+++ b/docs/pages/api-reference/source_connectors/redshift.md
@@ -8,7 +8,7 @@ Data connector to Redshift databases.
 
 #### Database Parameters
 
-A name to identify the source. The name should be unique across all Fennel sources.
+A name to identify the source. The name should be unique across all Fennel connectors.
 
 
 
diff --git a/docs/pages/api-reference/connectors/s3.md b/docs/pages/api-reference/source_connectors/s3.md
similarity index 96%
rename from docs/pages/api-reference/connectors/s3.md
rename to docs/pages/api-reference/source_connectors/s3.md
index 856f41968..ca9e0b61f 100644
--- a/docs/pages/api-reference/connectors/s3.md
+++ b/docs/pages/api-reference/source_connectors/s3.md
@@ -8,7 +8,7 @@ Data connector to source data from S3.
 
 #### Account Parameters
 
-A name to identify the source. The name should be unique across all Fennel sources.
+A name to identify the source. The name should be unique across all Fennel connectors.
 
 
 
@@ -58,7 +58,7 @@ Exactly one of `prefix` or `path` must be provided.
 
 
 The format of the files you'd like to ingest. Valid values are "csv", "parquet", 
-"json", ["delta"](/api-reference/connectors/deltalake) or ["hudi"](/api-reference/connectors/hudi).
+"json", ["delta"](/api-reference/source_connectors/deltalake) or ["hudi"](/api-reference/source_connectors/hudi).
 
 
 
@@ -102,9 +102,6 @@ this path to have a timestamp between `2024-02-06 18:00:00` and `2024-02-08 06:0
 
 
-
-
#### Errors diff --git a/docs/pages/api-reference/connectors/snowflake.md b/docs/pages/api-reference/source_connectors/snowflake.md similarity index 99% rename from docs/pages/api-reference/connectors/snowflake.md rename to docs/pages/api-reference/source_connectors/snowflake.md index b88850805..15502459b 100644 --- a/docs/pages/api-reference/connectors/snowflake.md +++ b/docs/pages/api-reference/source_connectors/snowflake.md @@ -8,7 +8,7 @@ Data connector to Snowflake databases. #### Database Parameters -A name to identify the source. The name should be unique across all Fennel sources. +A name to identify the source. The name should be unique across all Fennel connectors. diff --git a/docs/pages/api-reference/connectors/webhook.md b/docs/pages/api-reference/source_connectors/webhook.md similarity index 100% rename from docs/pages/api-reference/connectors/webhook.md rename to docs/pages/api-reference/source_connectors/webhook.md diff --git a/docs/pages/concepts/introduction.md b/docs/pages/concepts/introduction.md index d611fea05..9a9bc04e2 100644 --- a/docs/pages/concepts/introduction.md +++ b/docs/pages/concepts/introduction.md @@ -41,7 +41,7 @@ them for now - though if you are interested, you can read about them and sources in general [here](/concepts/source). Besides Postgres and Kafka, Fennel supports connectors with many other sources. -See the [full list](/api-reference/connectors). +See the [full list](/api-reference/source_connectors). ### Pipeline diff --git a/docs/pages/concepts/sink.md b/docs/pages/concepts/sink.md index 0f76b18f0..420777e99 100644 --- a/docs/pages/concepts/sink.md +++ b/docs/pages/concepts/sink.md @@ -24,8 +24,6 @@ the CDC data needs to be written out in the debezium format. That's it - once this is written, `UserLocationFiltered` dataset will start publishing changes to your Kafka. -As of right now, Fennel only supports Kafka sinks and writes data in the debezium -format. Given the ubiquity of debezium connectors, you should be able to further -pipe this debezium data from Kafka to your data store of choice. - -More data stores and cdc strategies will be supported in the future updates. \ No newline at end of file +Fennel ships with data sinks to a couple of [common datastores](/api-reference/sink_connectors) +so that you can 'sink' from your Fennel datasets to your external datasets. +Sinks to many other common data stores will be added soon. \ No newline at end of file diff --git a/docs/pages/concepts/source.md b/docs/pages/concepts/source.md index 531825547..bfffb4e22 100644 --- a/docs/pages/concepts/source.md +++ b/docs/pages/concepts/source.md @@ -9,7 +9,7 @@ status: 'published' Data gets into Fennel datasets via Sources - in fact, sources are the only mechanism for data to reach a Fennel dataset. -Fennel ships with data connectors to all [common datastores](/api-reference/connectors) +Fennel ships with data connectors to all [common datastores](/api-reference/source_connectors) so that you can 'source' your Fennel datasets from your external datasets. In addition to the pull based sources that read from external data sources, Fennel also ships with a push based source called `Webhook` for you to manually push diff --git a/docs/pages/development/unit-testing.md b/docs/pages/development/unit-testing.md index 381093675..09fc154d0 100644 --- a/docs/pages/development/unit-testing.md +++ b/docs/pages/development/unit-testing.md @@ -48,7 +48,7 @@ client - you can commit datasets/featuresets, log data, extract features etc. You can bring data to a dataset in the mock server, by using the log function from our testing library or by explicitly logging data to a -[webhook](/api-reference/connectors/webhook). +[webhook](/api-reference/source_connectors/webhook). diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md index 601c7549e..250058da1 100644 --- a/fennel/CHANGELOG.md +++ b/fennel/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## [1.5.39] - 2024-10-14 +- Add support for Snowflake sink + ## [1.5.38] - 2024-10-13 - Add support for use_v2 in query_offline. diff --git a/fennel/connectors/connectors.py b/fennel/connectors/connectors.py index f9820f69f..d20b0cf16 100644 --- a/fennel/connectors/connectors.py +++ b/fennel/connectors/connectors.py @@ -222,11 +222,20 @@ def sink( f"{', '.join(conn.required_fields())}." ) - if not isinstance(conn, KafkaConnector) and not isinstance( - conn, S3Connector + if ( + not isinstance(conn, KafkaConnector) + and not isinstance(conn, S3Connector) + and ( + not isinstance(conn, TableConnector) + or ( + isinstance(conn, TableConnector) + and not isinstance(conn.data_source, Snowflake) + ) + ) ): raise ValueError( - "Sink is only supported for Kafka and S3, found %s" % type(conn) + "Sink is only supported for Kafka, S3 and Snowflake, found %s" + % type(conn), ) if isinstance(conn, KafkaConnector): @@ -255,6 +264,17 @@ def sink( "S3 sink only supports data access through Fennel DataAccess IAM Role" ) + if isinstance(conn, TableConnector): + if isinstance(conn.data_source, Snowflake): + if cdc: + raise ValueError("CDC shouldn't be set for Snowflake sink") + if conn.cursor: + raise ValueError("Cursor shouldn't be set for Snowflake sink") + if how != "incremental": + raise ValueError( + "Only Incremental style supported for Snowflake sink" + ) + def decorator(dataset_cls: T): conn.cdc = cdc conn.every = every @@ -532,11 +552,13 @@ class Snowflake(DataSource): src_schema: str = Field(alias="schema") role: str - def table(self, table_name: str, cursor: str) -> TableConnector: + def table( + self, table_name: str, cursor: Optional[str] = None + ) -> TableConnector: return TableConnector(self, table_name, cursor) def required_fields(self) -> List[str]: - return ["table", "cursor"] + return ["table"] @staticmethod def get(name: str) -> Snowflake: @@ -715,11 +737,11 @@ def identifier(self) -> str: class TableConnector(DataConnector): - """DataConnectors which only need a table name and a cursor to be - specified. Includes BigQuery, MySQL, Postgres, Snowflake and Redshift.""" + """DataConnectors which only need a table name. Cursor field should be specified for source + Includes BigQuery, MySQL, Postgres, Snowflake and Redshift.""" table_name: str - cursor: str + cursor: Optional[str] = None def __init__(self, source, table_name, cursor): if isinstance(source, Redshift): diff --git a/fennel/connectors/test_connectors.py b/fennel/connectors/test_connectors.py index e4de6a84f..9ac07900e 100644 --- a/fennel/connectors/test_connectors.py +++ b/fennel/connectors/test_connectors.py @@ -598,6 +598,12 @@ def test_env_selector_on_connector(): cdc="upsert", env=["prod_new"], ) + @source( + kafka.topic("test_topic"), + disorder="14d", + cdc="upsert", + env=["prod_new2"], + ) @dataset class UserInfoDataset: user_id: int = field(key=True) @@ -628,6 +634,13 @@ class UserInfoDataset: renames={"uid": "new_uid"}, env=["prod_new"], ) + @sink( + snowflake.table("random_table"), + every="1d", + how="incremental", + renames={"uid": "new_uid"}, + env=["prod_new2"], + ) @dataset class UserInfoDatasetDerived: user_id: int = field(key=True) @@ -803,6 +816,42 @@ def create_user_transactions(cls, dataset: Dataset): sink_request, expected_sink_request ) + sync_request = view._get_sync_request_proto(env="prod_new2") + assert len(sync_request.datasets) == 2 + assert len(sync_request.sources) == 1 + assert len(sync_request.sinks) == 1 + assert len(sync_request.extdbs) == 2 + + sink_request = sync_request.sinks[0] + s = { + "table": { + "snowflake_table": { + "db": { + "name": "snowflake_src", + "snowflake": { + "user": "", + "password": "", + "account": "nhb38793.us-west-2.snowflakecomputing.com", + "schema": "PUBLIC", + "warehouse": "TEST", + "role": "ACCOUNTADMIN", + "database": "MOVIELENS", + }, + }, + "table_name": "random_table", + }, + }, + "dataset": "UserInfoDatasetDerived", + "dsVersion": 1, + "every": "86400s", + "how": {"incremental": {}}, + "create": True, + } + expected_sink_request = ParseDict(s, connector_proto.Sink()) + assert sink_request == expected_sink_request, error_message( + sink_request, expected_sink_request + ) + def test_kafka_sink_and_source_doesnt_create_extra_extdbs(): @meta(owner="test@test.com") diff --git a/fennel/connectors/test_invalid_connectors.py b/fennel/connectors/test_invalid_connectors.py index dc9d271c4..5e1404f02 100644 --- a/fennel/connectors/test_invalid_connectors.py +++ b/fennel/connectors/test_invalid_connectors.py @@ -326,7 +326,6 @@ def test_invalid_sink(client): with pytest.raises(Exception) as e: @meta(owner="test@test.com") - @source(kafka.topic("test_topic"), disorder="14d", cdc="append") @sink(kafka.topic("test_topic_2", format="csv"), cdc="debezium") @dataset class UserInfoDataset: @@ -350,7 +349,6 @@ class UserInfoDataset: with pytest.raises(Exception) as e: @meta(owner="test@test.com") - @source(kafka.topic("test_topic"), disorder="14d", cdc="append") @sink(kafka.topic("test_topic_2", format="json"), cdc="native") @dataset class UserInfoDataset: @@ -371,7 +369,6 @@ class UserInfoDataset: with pytest.raises(Exception) as e: @meta(owner="test@test.com") - @source(kafka.topic("test_topic"), disorder="14d", cdc="append") @sink( kinesis.stream( "test_stream", format="json", init_position=datetime(2023, 1, 5) @@ -394,7 +391,7 @@ class UserInfoDataset: assert ( str(e.value) - == "Sink is only supported for Kafka and S3, found " + == "Sink is only supported for Kafka, S3 and Snowflake, found " ) @@ -779,6 +776,41 @@ def test_invalid_protobuf_args(): assert "Either username/password or token should be set" == str(e.value) +def test_invalid_snowflake_batch_sink(): + # CDC passed + with pytest.raises(ValueError) as e: + sink( + snowflake.table("test_table"), + every="1h", + cdc="append", + ) + + assert "CDC shouldn't be set for Snowflake sink" == str(e.value) + + # Recreate style passed for how + with pytest.raises(ValueError) as e: + sink( + snowflake.table("test_table"), + every="1h", + how="recreate", + ) + + assert "Only Incremental style supported for Snowflake sink" == str(e.value) + + # Cursor value passed for Snowflake + with pytest.raises(ValueError) as e: + sink( + snowflake.table( + "test_table", + cursor="cursor", + ), + every="1h", + how="incremental", + ) + + assert "Cursor shouldn't be set for Snowflake sink" == str(e.value) + + def test_invalid_s3_batch_sink(): # CDC passed with pytest.raises(ValueError) as e: diff --git a/fennel/internal_lib/to_proto/to_proto.py b/fennel/internal_lib/to_proto/to_proto.py index 9ffda478f..61426f0e6 100644 --- a/fennel/internal_lib/to_proto/to_proto.py +++ b/fennel/internal_lib/to_proto/to_proto.py @@ -900,6 +900,8 @@ def _conn_to_sink_proto( return _kafka_conn_to_sink_proto(connector, dataset_name, ds_version) if isinstance(connector, connectors.S3Connector): return _s3_conn_to_sink_proto(connector, dataset_name, ds_version) + if isinstance(connector, connectors.TableConnector): + return _table_conn_to_sink_proto(connector, dataset_name, ds_version) else: raise ValueError( f"Unsupported connector type: {type(connector)} for sink" @@ -1225,6 +1227,20 @@ def _s3_to_ext_table_proto( ) +def _table_conn_to_sink_proto( + connector: connectors.TableConnector, + dataset_name: str, + ds_version: int, +) -> Tuple[connector_proto.ExtDatabase, connector_proto.Sink]: + data_source = connector.data_source + if isinstance(data_source, connectors.Snowflake): + return _snowflake_conn_to_sink_proto( + connector, data_source, dataset_name, ds_version + ) + else: + raise ValueError(f"Unknown data sink type: {type(data_source)}") + + def _table_conn_to_source_proto( connector: connectors.TableConnector, dataset: Dataset, @@ -1253,6 +1269,8 @@ def _bigquery_conn_to_source_proto( ) -> Tuple[connector_proto.ExtDatabase, connector_proto.Source]: if not connector.cdc: raise ValueError("CDC should always be set for BigQuery source") + if connector.cursor is None: + raise ValueError("Cursor should always be set for BigQuery source") service_account_key = connector.data_source.service_account_key if type(service_account_key) is dict: # Convert service_account_key to str defined in proto @@ -1327,6 +1345,8 @@ def _redshift_conn_to_source_proto( ) -> Tuple[connector_proto.ExtDatabase, connector_proto.Source]: if not connector.cdc: raise ValueError("CDC should always be set for Redshift source") + if connector.cursor is None: + raise ValueError("Cursor should always be set for Redshift source") ext_db = _redshift_to_ext_db_proto( name=data_source.name, s3_access_role_arn=data_source.s3_access_role_arn, @@ -1429,6 +1449,8 @@ def _mongo_conn_to_source_proto( ) -> Tuple[connector_proto.ExtDatabase, connector_proto.Source]: if not connector.cdc: raise ValueError("CDC should always be set for Mongo source") + if connector.cursor is None: + raise ValueError("Cursor should always be set for Mongo source") ext_db = _mongo_to_ext_db_proto( name=data_source.name, host=data_source.host, @@ -1542,6 +1564,46 @@ def _pubsub_conn_to_source_proto( ) +def _snowflake_conn_to_sink_proto( + connector: connectors.TableConnector, + data_source: connectors.Snowflake, + dataset_name: str, + ds_version: int, +) -> Tuple[connector_proto.ExtDatabase, connector_proto.Source]: + if connector.cdc: + raise ValueError("CDC shouldn't be set for Snowflake sink") + if connector.cursor: + raise ValueError("Cursor shouldn't be set for Snowflake sink") + ext_db = _snowflake_to_ext_db_proto( + name=data_source.name, + account=data_source.account, + user=data_source.username, + password=data_source.password, + schema=data_source.src_schema, + warehouse=data_source.warehouse, + role=data_source.role, + database=data_source.db_name, + ) + ext_table = _snowflake_to_ext_table_proto( + db=ext_db, + table_name=connector.table_name, + ) + cdc = to_cdc_proto(connector.cdc) if connector.cdc else None + sink = connector_proto.Sink( + table=ext_table, + dataset=dataset_name, + ds_version=ds_version, + cdc=cdc, + every=to_duration_proto(connector.every), + how=_how_to_proto(connector.how), + create=connector.create, + renames=_renames_to_proto(connector.renames), + since=_to_timestamp_proto(connector.since), + until=_to_timestamp_proto(connector.until), + ) + return ext_db, sink + + def _snowflake_conn_to_source_proto( connector: connectors.TableConnector, data_source: connectors.Snowflake, @@ -1549,6 +1611,8 @@ def _snowflake_conn_to_source_proto( ) -> Tuple[connector_proto.ExtDatabase, connector_proto.Source]: if not connector.cdc: raise ValueError("CDC should always be set for Snowflake source") + if connector.cursor is None: + raise ValueError("Cursor should always be set for Snowflake source") ext_db = _snowflake_to_ext_db_proto( name=data_source.name, account=data_source.account, @@ -1653,6 +1717,8 @@ def _mysql_conn_to_source_proto( ) -> Tuple[connector_proto.ExtDatabase, connector_proto.Source]: if not connector.cdc: raise ValueError("CDC should always be set for Mysql source") + if connector.cursor is None: + raise ValueError("Cursor should always be set for Mysql source") if data_source._get: ext_db = _mysql_ref_to_ext_db_proto(name=data_source.name) else: @@ -1746,6 +1812,8 @@ def _pg_conn_to_source_proto( ) -> Tuple[connector_proto.ExtDatabase, connector_proto.Source]: if not connector.cdc: raise ValueError("CDC should always be set for Postgres source") + if connector.cursor is None: + raise ValueError("Cursor should always be set for Postgres source") if data_source._get: ext_db = _pg_ref_to_ext_db_proto(name=data_source.name) else: diff --git a/pyproject.toml b/pyproject.toml index 36b5b8ec1..d175fa58f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fennel-ai" -version = "1.5.38" +version = "1.5.39" description = "The modern realtime feature engineering platform" authors = ["Fennel AI "] packages = [{ include = "fennel" }]