diff --git a/docs/examples/concepts/sink.py b/docs/examples/concepts/sink.py new file mode 100644 index 00000000..bd3993a1 --- /dev/null +++ b/docs/examples/concepts/sink.py @@ -0,0 +1,84 @@ +import sys +from datetime import datetime + +from fennel.connectors import Kafka, S3, eval, source, sink +from fennel.datasets import dataset +from fennel.expr import col +from fennel.testing import mock + +__owner__ = "owner@example.com" + + +@mock +def test_stacked_sinks(client): + kafka = Kafka( + name="my_kafka", + bootstrap_servers="localhost:9092", + security_protocol="SASL_PLAINTEXT", + sasl_mechanism="PLAIN", + sasl_plain_username="user", + sasl_plain_password="password", + ) + s3 = S3(name="mys3") + cutoff = datetime(2024, 1, 1, 0, 0, 0) + bucket = s3.bucket("data", path="orders") + + # docsnip-highlight start + @source(bucket, disorder="1w", cdc="append", until=cutoff) + @source(kafka.topic("order"), disorder="1d", cdc="append", since=cutoff) + # docsnip-highlight end + @dataset + class UserLocation: + uid: int + city: str + country: str + update_time: datetime + + # docsnip stacked + from fennel.connectors import sink, Kafka, Snowflake + from fennel.datasets import dataset, pipeline, Dataset + from fennel.lib.params import inputs + + kafka = Kafka( + name="kafka_src", + bootstrap_servers=os.environ["KAFKA_HOST"], + security_protocol="PLAINTEXT", + sasl_mechanism="PLAIN", + sasl_plain_username=os.environ["KAFKA_USERNAME"], + sasl_plain_password=os.environ["KAFKA_PASSWORD"], + ) + snowflake = Snowflake( + name="my_snowflake", + account="VPECCVJ-MUB03765", + warehouse="TEST", + db_name=os.environ["DB_NAME"], + schema="PUBLIC", + role="ACCOUNTADMIN", + username=os.environ["SNOWFLAKE_USERNAME"], + password=os.environ["SNOWFLAKE_PASSWORD"], + ) + + # docsnip-highlight start + @sink( + snowflake.table("test_table"), + every="1d", + how="incremental", + renames={"uid": "new_uid"}, + stacked=True, + ) + @sink(kafka.topic("user_location"), cdc="debezium") + # docsnip-highlight end + @dataset + class UserLocationFiltered: + uid: int + city: str + country: str + update_time: datetime + + @pipeline + @inputs(UserLocation) + def user_location_count(cls, dataset: Dataset): + return dataset.filter(lambda row: row["country"] != "United States") + + # /docsnip + client.commit(message="some commit msg", datasets=[Order]) diff --git a/docs/pages/api-reference/decorators/sink.md b/docs/pages/api-reference/decorators/sink.md index b234c215..56b3ec95 100644 --- a/docs/pages/api-reference/decorators/sink.md +++ b/docs/pages/api-reference/decorators/sink.md @@ -62,6 +62,12 @@ This means that the column denoted by the key is aliased to another column in th This is useful, for instance, when you want to rename columns while sinking them. + +This indicates whether the sink is stacked on top of an existing sink. +If multiple sinks are added to the dataset, set this field to True +for all sinks except the first one. + +
diff --git a/docs/pages/concepts/sink.md b/docs/pages/concepts/sink.md
index 420777e9..11e241ca 100644
--- a/docs/pages/concepts/sink.md
+++ b/docs/pages/concepts/sink.md
@@ -26,4 +26,12 @@ publishing changes to your Kafka.
 
 Fennel ships with data sinks to a couple of [common datastores](/api-reference/sink_connectors) 
 so that you can 'sink' from your Fennel datasets to your external datasets.
-Sinks to many other common data stores will be added soon.
\ No newline at end of file
+Sinks to many other common data stores will be added soon.
+
+## Stacking Multiple Sinks
+Stacking two (or more) sinks on top of a dataset simply leads to the dataset
+getting sinked to all of them.
+
+

\ No newline at end of file
diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md
index ffa25def..3ef30b1b 100644
--- a/fennel/CHANGELOG.md
+++ b/fennel/CHANGELOG.md
@@ -1,5 +1,8 @@
 # Changelog
 
+## [1.5.59] - 2024-12-06
+- Add support for stacked sinks
+
 ## [1.5.58] - 2024-11-24
 - Allow min/max aggregation on date, datetime and decimal dtypes
 
diff --git a/fennel/connectors/connectors.py b/fennel/connectors/connectors.py
index a5c65c61..f52d9bf2 100644
--- a/fennel/connectors/connectors.py
+++ b/fennel/connectors/connectors.py
@@ -252,6 +252,7 @@ def sink(
     since: Optional[datetime] = None,
     until: Optional[datetime] = None,
     env: Optional[Union[str, List[str]]] = None,
+    stacked: Optional[bool] = None,
 ) -> Callable[[T], Any]:
     if not isinstance(conn, DataConnector):
         if not isinstance(conn, DataSource):
@@ -352,6 +353,7 @@ def decorator(dataset_cls: T):
         conn.renames = renames
         conn.since = since
         conn.until = until
+        conn.stacked = stacked
         conn.envs = EnvSelector(env)
         connectors = getattr(dataset_cls, SINK_FIELD, [])
         connectors.append(conn)
@@ -821,6 +823,7 @@ class DataConnector:
     how: Optional[Literal["incremental", "recreate"] | SnapshotData] = None
     create: Optional[bool] = None
     renames: Optional[Dict[str, str]] = {}
+    stacked: Optional[bool] = None
 
     def identifier(self):
         raise NotImplementedError
diff --git a/fennel/connectors/test_connectors.py b/fennel/connectors/test_connectors.py
index d4f008b8..e03e5a07 100644
--- a/fennel/connectors/test_connectors.py
+++ b/fennel/connectors/test_connectors.py
@@ -680,6 +680,14 @@ class UserInfoDataset:
         cdc="debezium",
         env=["prod_new4"],
     )
+    @sink(
+        http_with_secret.path(
+            endpoint="/sink3", limit=100, headers={"Foo": "Bar"}
+        ),
+        cdc="debezium",
+        env=["prod_new4"],
+        stacked=True,
+    )
     @dataset
     class UserInfoDatasetDerived:
         user_id: int = field(key=True)
@@ -934,10 +942,10 @@ def create_user_transactions(cls, dataset: Dataset):
     sync_request = view._get_sync_request_proto(env="prod_new4")
     assert len(sync_request.datasets) == 2
     assert len(sync_request.sources) == 1
-    assert len(sync_request.sinks) == 1
+    assert len(sync_request.sinks) == 2
     assert len(sync_request.extdbs) == 2
 
-    sink_request = sync_request.sinks[0]
+    sink_request = sync_request.sinks[1]
     s = {
         "table": {
             "http_path": {
@@ -975,6 +983,45 @@ def create_user_transactions(cls, dataset: Dataset):
         sink_request, expected_sink_request
     )
 
+    sink_request = sync_request.sinks[0]
+    s = {
+        "table": {
+            "http_path": {
+                "db": {
+                    "name": "http_sink_with_secret",
+                    "http": {
+                        "host_secret": {
+                            "secret_arn": "arn:aws:secretsmanager:us-west-2:123456789012:secret:fennel-test-secret-1",
+                            "role_arn": "arn:aws:iam::123456789012:role/fennel-test-role",
+                            "path": ["http_host"],
+                        },
+                        "healthz": "/health",
+                        "ca_cert": {
+                            "secret_ref": {
+                                "secret_arn": "arn:aws:secretsmanager:us-west-2:123456789012:secret:fennel-test-secret-1",
+                                "role_arn": "arn:aws:iam::123456789012:role/fennel-test-role",
+                                "path": ["ca_cert"],
+                            },
+                        },
+                    },
+                },
+                "endpoint": "/sink3",
+                "limit": 100,
+                "headers": {
+                    "Foo": "Bar",
+                },
+            },
+        },
+        "dataset": "UserInfoDatasetDerived",
+        "dsVersion": 1,
+        "cdc": "Debezium",
+        "stacked": True,
+    }
+    expected_sink_request = ParseDict(s, connector_proto.Sink())
+    assert sink_request == expected_sink_request, error_message(
+        sink_request, expected_sink_request
+    )
+
 
 def test_kafka_sink_and_source_doesnt_create_extra_extdbs():
     @meta(owner="test@test.com")
diff --git a/fennel/connectors/test_invalid_connectors.py b/fennel/connectors/test_invalid_connectors.py
index b0037083..77c8d6d3 100644
--- a/fennel/connectors/test_invalid_connectors.py
+++ b/fennel/connectors/test_invalid_connectors.py
@@ -23,10 +23,11 @@
     HTTP,
     Certificate,
 )
-from fennel.datasets import dataset, field
+from fennel.datasets import dataset, field, pipeline, Dataset
 from fennel.expr import col
 from fennel.integrations import Secret
 from fennel.lib import meta
+from fennel.lib.params import inputs
 
 # noinspection PyUnresolvedReferences
 from fennel.testing import *
@@ -113,6 +114,13 @@
     ca_cert=Certificate(aws_secret["ca_cert"]),
 )
 
+http_with_secret = HTTP(
+    name="http_sink_with_secret",
+    host=aws_secret["http_host"],
+    healthz="/health",
+    ca_cert=Certificate(aws_secret["ca_cert"]),
+)
+
 
 def test_simple_source():
     with pytest.raises(TypeError) as e:
@@ -1085,3 +1093,188 @@ class UserInfoDataset:
         "Field `age` defined in schema for eval where has different type in the dataframe."
         == str(e.value)
     )
+
+
+@mock
+def test_multiple_non_stacked_sinks(client):
+    with pytest.raises(ValueError) as e:
+
+        @source(
+            kafka.topic("test_topic"),
+            disorder="14d",
+            cdc="upsert",
+            env=["prod_new4"],
+        )
+        @dataset
+        class UserInfoDataset:
+            user_id: int = field(key=True)
+            name: str
+            gender: str
+            # Users date of birth
+            dob: str
+            age: int
+            account_creation_date: datetime
+            country: Optional[str]
+            timestamp: datetime = field(timestamp=True)
+
+        @sink(
+            http_with_secret.path(
+                endpoint="/sink", limit=100, headers={"Foo": "Bar"}
+            ),
+            cdc="debezium",
+            env=["prod_new4"],
+            stacked=True,
+        )
+        @sink(
+            http_with_secret.path(
+                endpoint="/sink3", limit=100, headers={"Foo": "Bar"}
+            ),
+            cdc="debezium",
+            env=["prod_new4"],
+            stacked=True,
+        )
+        @dataset
+        class UserInfoDatasetDerived:
+            user_id: int = field(key=True)
+            name: str
+            gender: str
+            # Users date of birth
+            dob: str
+            age: int
+            account_creation_date: datetime
+            country: Optional[str]
+            timestamp: datetime = field(timestamp=True)
+
+            @pipeline
+            @inputs(UserInfoDataset)
+            def create_user_transactions(cls, dataset: Dataset):
+                return dataset
+
+        client.commit(
+            message="msg",
+            datasets=[UserInfoDataset, UserInfoDatasetDerived],
+            featuresets=[],
+        )
+
+    assert (
+        "Expected 1 stacked sinks on top of dataset: UserInfoDatasetDerived but found 2"
+        == str(e.value)
+    )
+
+    with pytest.raises(ValueError) as e:
+
+        @source(
+            kafka.topic("test_topic"),
+            disorder="14d",
+            cdc="upsert",
+            env=["prod_new4"],
+        )
+        @dataset
+        class UserInfoDataset:
+            user_id: int = field(key=True)
+            name: str
+            gender: str
+            # Users date of birth
+            dob: str
+            age: int
+            account_creation_date: datetime
+            country: Optional[str]
+            timestamp: datetime = field(timestamp=True)
+
+        @sink(
+            http_with_secret.path(
+                endpoint="/sink", limit=100, headers={"Foo": "Bar"}
+            ),
+            cdc="debezium",
+            env=["prod_new4"],
+        )
+        @sink(
+            http_with_secret.path(
+                endpoint="/sink3", limit=100, headers={"Foo": "Bar"}
+            ),
+            cdc="debezium",
+            env=["prod_new4"],
+        )
+        @dataset
+        class UserInfoDatasetDerived:
+            user_id: int = field(key=True)
+            name: str
+            gender: str
+            # Users date of birth
+            dob: str
+            age: int
+            account_creation_date: datetime
+            country: Optional[str]
+            timestamp: datetime = field(timestamp=True)
+
+            @pipeline
+            @inputs(UserInfoDataset)
+            def create_user_transactions(cls, dataset: Dataset):
+                return dataset
+
+        client.commit(
+            message="msg",
+            datasets=[UserInfoDataset, UserInfoDatasetDerived],
+            featuresets=[],
+        )
+
+    assert (
+        "Add the new sinks as stacked on top of dataset: UserInfoDatasetDerived"
+        == str(e.value)
+    )
+
+    with pytest.raises(ValueError) as e:
+
+        @source(
+            kafka.topic("test_topic"),
+            disorder="14d",
+            cdc="upsert",
+            env=["prod_new4"],
+        )
+        @dataset
+        class UserInfoDataset:
+            user_id: int = field(key=True)
+            name: str
+            gender: str
+            # Users date of birth
+            dob: str
+            age: int
+            account_creation_date: datetime
+            country: Optional[str]
+            timestamp: datetime = field(timestamp=True)
+
+        @sink(
+            http_with_secret.path(
+                endpoint="/sink3", limit=100, headers={"Foo": "Bar"}
+            ),
+            cdc="debezium",
+            env=["prod_new4"],
+            stacked=True,
+        )
+        @dataset
+        class UserInfoDatasetDerived:
+            user_id: int = field(key=True)
+            name: str
+            gender: str
+            # Users date of birth
+            dob: str
+            age: int
+            account_creation_date: datetime
+            country: Optional[str]
+            timestamp: datetime = field(timestamp=True)
+
+            @pipeline
+            @inputs(UserInfoDataset)
+            def create_user_transactions(cls, dataset: Dataset):
+                return dataset
+
+        client.commit(
+            message="msg",
+            datasets=[UserInfoDataset, UserInfoDatasetDerived],
+            featuresets=[],
+        )
+
+    assert (
+        "Stacked sink not supported when there is only one sink added on top of dataset: UserInfoDatasetDerived"
+        == str(e.value)
+    )
diff --git a/fennel/gen/connector_pb2.py b/fennel/gen/connector_pb2.py
index 33f1b072..37391783 100644
--- a/fennel/gen/connector_pb2.py
+++ b/fennel/gen/connector_pb2.py
@@ -22,7 +22,7 @@
 import fennel.gen.secret_pb2 as secret__pb2
 
 
-DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0f\x63onnector.proto\x12\x16\x66\x65nnel.proto.connector\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x10\x65xpression.proto\x1a\rkinesis.proto\x1a\x0cpycode.proto\x1a\x15schema_registry.proto\x1a\x0cschema.proto\x1a\x0csecret.proto\"\xba\x05\n\x0b\x45xtDatabase\x12\x0c\n\x04name\x18\x01 \x01(\t\x12.\n\x05mysql\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.connector.MySQLH\x00\x12\x34\n\x08postgres\x18\x03 \x01(\x0b\x32 .fennel.proto.connector.PostgresH\x00\x12\x36\n\treference\x18\x04 \x01(\x0b\x32!.fennel.proto.connector.ReferenceH\x00\x12(\n\x02s3\x18\x05 \x01(\x0b\x32\x1a.fennel.proto.connector.S3H\x00\x12\x34\n\x08\x62igquery\x18\x06 \x01(\x0b\x32 .fennel.proto.connector.BigqueryH\x00\x12\x36\n\tsnowflake\x18\x07 \x01(\x0b\x32!.fennel.proto.connector.SnowflakeH\x00\x12.\n\x05kafka\x18\x08 \x01(\x0b\x32\x1d.fennel.proto.connector.KafkaH\x00\x12\x32\n\x07webhook\x18\t \x01(\x0b\x32\x1f.fennel.proto.connector.WebhookH\x00\x12\x32\n\x07kinesis\x18\n \x01(\x0b\x32\x1f.fennel.proto.connector.KinesisH\x00\x12\x34\n\x08redshift\x18\x0b \x01(\x0b\x32 .fennel.proto.connector.RedshiftH\x00\x12.\n\x05mongo\x18\x0c \x01(\x0b\x32\x1d.fennel.proto.connector.MongoH\x00\x12\x30\n\x06pubsub\x18\r \x01(\x0b\x32\x1e.fennel.proto.connector.PubSubH\x00\x12,\n\x04http\x18\x0e \x01(\x0b\x32\x1c.fennel.proto.connector.HttpH\x00\x42\t\n\x07variant\"?\n\x10SamplingStrategy\x12\x15\n\rsampling_rate\x18\x01 \x01(\x01\x12\x14\n\x0c\x63olumns_used\x18\x02 \x03(\t\"M\n\x0cPubSubFormat\x12\x32\n\x04json\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.JsonFormatH\x00\x42\t\n\x07variant\"\xbc\x01\n\x0bKafkaFormat\x12\x32\n\x04json\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.JsonFormatH\x00\x12\x32\n\x04\x61vro\x18\x02 \x01(\x0b\x32\".fennel.proto.connector.AvroFormatH\x00\x12:\n\x08protobuf\x18\x03 \x01(\x0b\x32&.fennel.proto.connector.ProtobufFormatH\x00\x42\t\n\x07variant\"\x10\n\x0e\x44\x65\x62\x65ziumFormat\"\x0c\n\nJsonFormat\"S\n\nAvroFormat\x12\x45\n\x0fschema_registry\x18\x01 \x01(\x0b\x32,.fennel.proto.schema_registry.SchemaRegistry\"W\n\x0eProtobufFormat\x12\x45\n\x0fschema_registry\x18\x01 \x01(\x0b\x32,.fennel.proto.schema_registry.SchemaRegistry\"\xde\x01\n\tReference\x12;\n\x06\x64\x62type\x18\x01 \x01(\x0e\x32+.fennel.proto.connector.Reference.ExtDBType\"\x93\x01\n\tExtDBType\x12\t\n\x05MYSQL\x10\x00\x12\x0c\n\x08POSTGRES\x10\x01\x12\x06\n\x02S3\x10\x02\x12\t\n\x05KAFKA\x10\x03\x12\x0c\n\x08\x42IGQUERY\x10\x04\x12\r\n\tSNOWFLAKE\x10\x05\x12\x0b\n\x07WEBHOOK\x10\x06\x12\x0b\n\x07KINESIS\x10\x07\x12\x0c\n\x08REDSHIFT\x10\x08\x12\t\n\x05MONGO\x10\t\x12\n\n\x06PUBSUB\x10\n\"E\n\x07Webhook\x12\x0c\n\x04name\x18\x01 \x01(\t\x12,\n\tretention\x18\x02 \x01(\x0b\x32\x19.google.protobuf.Duration\"\x8c\x02\n\x05MySQL\x12\x0e\n\x04user\x18\x03 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x07 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x04 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04port\x18\x05 \x01(\r\x12\x13\n\x0bjdbc_params\x18\x06 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\x8f\x02\n\x08Postgres\x12\x0e\n\x04user\x18\x03 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x07 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x04 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04port\x18\x05 \x01(\r\x12\x13\n\x0bjdbc_params\x18\x06 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\xb0\x02\n\x02S3\x12\x1f\n\x15\x61ws_secret_access_key\x18\x01 \x01(\tH\x00\x12\x46\n\x1c\x61ws_secret_access_key_secret\x18\x04 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x1b\n\x11\x61ws_access_key_id\x18\x02 \x01(\tH\x01\x12\x42\n\x18\x61ws_access_key_id_secret\x18\x05 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x15\n\x08role_arn\x18\x03 \x01(\tH\x02\x88\x01\x01\x42\x1f\n\x1d\x61ws_secret_access_key_variantB\x1b\n\x19\x61ws_access_key_id_variantB\x0b\n\t_role_arn\"\xb6\x01\n\x08\x42igquery\x12\x1d\n\x13service_account_key\x18\x02 \x01(\tH\x00\x12\x44\n\x1aservice_account_key_secret\x18\x04 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\ndataset_id\x18\x01 \x01(\t\x12\x12\n\nproject_id\x18\x03 \x01(\tB\x1d\n\x1bservice_account_key_variant\"\xa1\x02\n\tSnowflake\x12\x0e\n\x04user\x18\x02 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x03 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\t \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0f\n\x07\x61\x63\x63ount\x18\x01 \x01(\t\x12\x0e\n\x06schema\x18\x04 \x01(\t\x12\x11\n\twarehouse\x18\x05 \x01(\t\x12\x0c\n\x04role\x18\x06 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x07 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\xf9\x02\n\x05Kafka\x12\x1d\n\x13sasl_plain_username\x18\x05 \x01(\tH\x00\x12>\n\x14sasl_username_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x1d\n\x13sasl_plain_password\x18\x06 \x01(\tH\x01\x12>\n\x14sasl_password_secret\x18\t \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x19\n\x11\x62ootstrap_servers\x18\x01 \x01(\t\x12\x19\n\x11security_protocol\x18\x02 \x01(\t\x12\x16\n\x0esasl_mechanism\x18\x03 \x01(\t\x12\x1c\n\x10sasl_jaas_config\x18\x04 \x01(\tB\x02\x18\x01\x12\x14\n\x08group_id\x18\x07 \x01(\tB\x02\x18\x01\x42\x17\n\x15sasl_username_variantB\x17\n\x15sasl_password_variant\"\x1b\n\x07Kinesis\x12\x10\n\x08role_arn\x18\x01 \x01(\t\"\xd3\x01\n\x0b\x43redentials\x12\x12\n\x08username\x18\x01 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x03 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x02 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x04 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x42\x12\n\x10username_variantB\x12\n\x10password_variant\"}\n\x16RedshiftAuthentication\x12\x1c\n\x12s3_access_role_arn\x18\x01 \x01(\tH\x00\x12:\n\x0b\x63redentials\x18\x02 \x01(\x0b\x32#.fennel.proto.connector.CredentialsH\x00\x42\t\n\x07variant\"\x99\x01\n\x08Redshift\x12\x10\n\x08\x64\x61tabase\x18\x01 \x01(\t\x12\x0c\n\x04host\x18\x02 \x01(\t\x12\x0c\n\x04port\x18\x03 \x01(\r\x12\x0e\n\x06schema\x18\x04 \x01(\t\x12O\n\x17redshift_authentication\x18\x05 \x01(\x0b\x32..fennel.proto.connector.RedshiftAuthentication\"\xe9\x01\n\x05Mongo\x12\x0e\n\x04user\x18\x03 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x05 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x04 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x06 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\xa0\x01\n\x06PubSub\x12\x1d\n\x13service_account_key\x18\x02 \x01(\tH\x00\x12\x44\n\x1aservice_account_key_secret\x18\x03 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\nproject_id\x18\x01 \x01(\tB\x1d\n\x1bservice_account_key_variant\"s\n\x0eSensitiveDatum\x12\x10\n\x06secret\x18\x01 \x01(\tH\x00\x12\x34\n\nsecret_ref\x18\x02 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x42\x19\n\x17sensitive_datum_variant\"\xb8\x01\n\x04Http\x12\x0e\n\x04host\x18\x01 \x01(\tH\x00\x12\x35\n\x0bhost_secret\x18\x02 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x0f\n\x07healthz\x18\x03 \x01(\t\x12<\n\x07\x63\x61_cert\x18\x04 \x01(\x0b\x32&.fennel.proto.connector.SensitiveDatumH\x01\x88\x01\x01\x42\x0e\n\x0chost_variantB\n\n\x08_ca_cert\"\xf7\x05\n\x08\x45xtTable\x12\x39\n\x0bmysql_table\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.MySQLTableH\x00\x12\x39\n\x08pg_table\x18\x02 \x01(\x0b\x32%.fennel.proto.connector.PostgresTableH\x00\x12\x33\n\x08s3_table\x18\x03 \x01(\x0b\x32\x1f.fennel.proto.connector.S3TableH\x00\x12\x39\n\x0bkafka_topic\x18\x04 \x01(\x0b\x32\".fennel.proto.connector.KafkaTopicH\x00\x12\x41\n\x0fsnowflake_table\x18\x05 \x01(\x0b\x32&.fennel.proto.connector.SnowflakeTableH\x00\x12?\n\x0e\x62igquery_table\x18\x06 \x01(\x0b\x32%.fennel.proto.connector.BigqueryTableH\x00\x12;\n\x08\x65ndpoint\x18\x07 \x01(\x0b\x32\'.fennel.proto.connector.WebhookEndpointH\x00\x12?\n\x0ekinesis_stream\x18\x08 \x01(\x0b\x32%.fennel.proto.connector.KinesisStreamH\x00\x12?\n\x0eredshift_table\x18\t \x01(\x0b\x32%.fennel.proto.connector.RedshiftTableH\x00\x12\x43\n\x10mongo_collection\x18\n \x01(\x0b\x32\'.fennel.proto.connector.MongoCollectionH\x00\x12;\n\x0cpubsub_topic\x18\x0b \x01(\x0b\x32#.fennel.proto.connector.PubSubTopicH\x00\x12\x35\n\thttp_path\x18\x0c \x01(\x0b\x32 .fennel.proto.connector.HttpPathH\x00\x42\t\n\x07variant\"Q\n\nMySQLTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"z\n\rPostgresTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\x12\x16\n\tslot_name\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x0c\n\n_slot_name\"\xf7\x01\n\x07S3Table\x12\x0e\n\x06\x62ucket\x18\x01 \x01(\t\x12\x13\n\x0bpath_prefix\x18\x02 \x01(\t\x12\x11\n\tdelimiter\x18\x04 \x01(\t\x12\x0e\n\x06\x66ormat\x18\x05 \x01(\t\x12/\n\x02\x64\x62\x18\x06 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\npre_sorted\x18\x07 \x01(\x08\x12\x13\n\x0bpath_suffix\x18\x08 \x01(\t\x12.\n\x06spread\x18\x03 \x01(\x0b\x32\x19.google.protobuf.DurationH\x00\x88\x01\x01\x12\x0f\n\x07headers\x18\t \x03(\tB\t\n\x07_spread\"\x81\x01\n\nKafkaTopic\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\r\n\x05topic\x18\x02 \x01(\t\x12\x33\n\x06\x66ormat\x18\x03 \x01(\x0b\x32#.fennel.proto.connector.KafkaFormat\"T\n\rBigqueryTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"U\n\x0eSnowflakeTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"\x81\x01\n\x0fWebhookEndpoint\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08\x65ndpoint\x18\x02 \x01(\t\x12+\n\x08\x64uration\x18\x03 \x01(\x0b\x32\x19.google.protobuf.Duration\"\xd3\x01\n\rKinesisStream\x12\x12\n\nstream_arn\x18\x01 \x01(\t\x12\x39\n\rinit_position\x18\x02 \x01(\x0e\x32\".fennel.proto.kinesis.InitPosition\x12\x32\n\x0einit_timestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0e\n\x06\x66ormat\x18\x04 \x01(\t\x12/\n\x02\x64\x62\x18\x05 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\"T\n\rRedshiftTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"\x86\x01\n\x0bPubSubTopic\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08topic_id\x18\x02 \x01(\t\x12\x34\n\x06\x66ormat\x18\x03 \x01(\x0b\x32$.fennel.proto.connector.PubSubFormat\"\xdb\x01\n\x08HttpPath\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08\x65ndpoint\x18\x02 \x01(\t\x12\x12\n\x05limit\x18\x03 \x01(\rH\x00\x88\x01\x01\x12>\n\x07headers\x18\x04 \x03(\x0b\x32-.fennel.proto.connector.HttpPath.HeadersEntry\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x08\n\x06_limit\"\x9e\x01\n\x04\x45val\x12+\n\x06schema\x18\x01 \x01(\x0b\x32\x1b.fennel.proto.schema.Schema\x12-\n\x04\x65xpr\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.expression.ExprH\x00\x12-\n\x06pycode\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCodeH\x00\x42\x0b\n\teval_type\"\x83\x01\n\x0cPreProcValue\x12\r\n\x03ref\x18\x01 \x01(\tH\x00\x12+\n\x05value\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.schema.ValueH\x00\x12,\n\x04\x65val\x18\x03 \x01(\x0b\x32\x1c.fennel.proto.connector.EvalH\x00\x42\t\n\x07variant\"[\n\x0fMongoCollection\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x17\n\x0f\x63ollection_name\x18\x02 \x01(\t\"2\n\x0cSnapshotData\x12\x0e\n\x06marker\x18\x01 \x01(\t\x12\x12\n\nnum_retain\x18\x02 \x01(\r\"\r\n\x0bIncremental\"\n\n\x08Recreate\"\xbc\x01\n\x05Style\x12:\n\x0bincremental\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.IncrementalH\x00\x12\x34\n\x08recreate\x18\x02 \x01(\x0b\x32 .fennel.proto.connector.RecreateH\x00\x12\x38\n\x08snapshot\x18\x03 \x01(\x0b\x32$.fennel.proto.connector.SnapshotDataH\x00\x42\x07\n\x05Style\"\xa5\x07\n\x06Source\x12/\n\x05table\x18\x01 \x01(\x0b\x32 .fennel.proto.connector.ExtTable\x12\x0f\n\x07\x64\x61taset\x18\x02 \x01(\t\x12\x12\n\nds_version\x18\x03 \x01(\r\x12(\n\x05\x65very\x18\x04 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x13\n\x06\x63ursor\x18\x05 \x01(\tH\x00\x88\x01\x01\x12+\n\x08\x64isorder\x18\x06 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x17\n\x0ftimestamp_field\x18\x07 \x01(\t\x12\x30\n\x03\x63\x64\x63\x18\x08 \x01(\x0e\x32#.fennel.proto.connector.CDCStrategy\x12\x31\n\rstarting_from\x18\t \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12=\n\x08pre_proc\x18\n \x03(\x0b\x32+.fennel.proto.connector.Source.PreProcEntry\x12\x0f\n\x07version\x18\x0b \x01(\r\x12\x0f\n\x07\x62ounded\x18\x0c \x01(\x08\x12\x30\n\x08idleness\x18\r \x01(\x0b\x32\x19.google.protobuf.DurationH\x01\x88\x01\x01\x12)\n\x05until\x18\x0e \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x30\n\x06\x66ilter\x18\x0f \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCodeH\x02\x88\x01\x01\x12H\n\x11sampling_strategy\x18\x10 \x01(\x0b\x32(.fennel.proto.connector.SamplingStrategyH\x03\x88\x01\x01\x12\x37\n\x0b\x66ilter_expr\x18\x11 \x01(\x0b\x32\x1d.fennel.proto.expression.ExprH\x04\x88\x01\x01\x12\x37\n\rfilter_schema\x18\x12 \x01(\x0b\x32\x1b.fennel.proto.schema.SchemaH\x05\x88\x01\x01\x1aT\n\x0cPreProcEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x33\n\x05value\x18\x02 \x01(\x0b\x32$.fennel.proto.connector.PreProcValue:\x02\x38\x01\x42\t\n\x07_cursorB\x0b\n\t_idlenessB\t\n\x07_filterB\x14\n\x12_sampling_strategyB\x0e\n\x0c_filter_exprB\x10\n\x0e_filter_schema\"\xee\x03\n\x04Sink\x12/\n\x05table\x18\x01 \x01(\x0b\x32 .fennel.proto.connector.ExtTable\x12\x0f\n\x07\x64\x61taset\x18\x02 \x01(\t\x12\x12\n\nds_version\x18\x03 \x01(\r\x12\x35\n\x03\x63\x64\x63\x18\x04 \x01(\x0e\x32#.fennel.proto.connector.CDCStrategyH\x00\x88\x01\x01\x12(\n\x05\x65very\x18\x05 \x01(\x0b\x32\x19.google.protobuf.Duration\x12/\n\x03how\x18\x06 \x01(\x0b\x32\x1d.fennel.proto.connector.StyleH\x01\x88\x01\x01\x12\x0e\n\x06\x63reate\x18\x07 \x01(\x08\x12:\n\x07renames\x18\x08 \x03(\x0b\x32).fennel.proto.connector.Sink.RenamesEntry\x12.\n\x05since\x18\t \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x02\x88\x01\x01\x12.\n\x05until\x18\n \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x03\x88\x01\x01\x1a.\n\x0cRenamesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x06\n\x04_cdcB\x06\n\x04_howB\x08\n\x06_sinceB\x08\n\x06_until*K\n\x0b\x43\x44\x43Strategy\x12\n\n\x06\x41ppend\x10\x00\x12\n\n\x06Upsert\x10\x01\x12\x0c\n\x08\x44\x65\x62\x65zium\x10\x02\x12\n\n\x06Native\x10\x03\x12\n\n\x06\x44\x65lete\x10\x04\x62\x06proto3')
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0f\x63onnector.proto\x12\x16\x66\x65nnel.proto.connector\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x10\x65xpression.proto\x1a\rkinesis.proto\x1a\x0cpycode.proto\x1a\x15schema_registry.proto\x1a\x0cschema.proto\x1a\x0csecret.proto\"\xba\x05\n\x0b\x45xtDatabase\x12\x0c\n\x04name\x18\x01 \x01(\t\x12.\n\x05mysql\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.connector.MySQLH\x00\x12\x34\n\x08postgres\x18\x03 \x01(\x0b\x32 .fennel.proto.connector.PostgresH\x00\x12\x36\n\treference\x18\x04 \x01(\x0b\x32!.fennel.proto.connector.ReferenceH\x00\x12(\n\x02s3\x18\x05 \x01(\x0b\x32\x1a.fennel.proto.connector.S3H\x00\x12\x34\n\x08\x62igquery\x18\x06 \x01(\x0b\x32 .fennel.proto.connector.BigqueryH\x00\x12\x36\n\tsnowflake\x18\x07 \x01(\x0b\x32!.fennel.proto.connector.SnowflakeH\x00\x12.\n\x05kafka\x18\x08 \x01(\x0b\x32\x1d.fennel.proto.connector.KafkaH\x00\x12\x32\n\x07webhook\x18\t \x01(\x0b\x32\x1f.fennel.proto.connector.WebhookH\x00\x12\x32\n\x07kinesis\x18\n \x01(\x0b\x32\x1f.fennel.proto.connector.KinesisH\x00\x12\x34\n\x08redshift\x18\x0b \x01(\x0b\x32 .fennel.proto.connector.RedshiftH\x00\x12.\n\x05mongo\x18\x0c \x01(\x0b\x32\x1d.fennel.proto.connector.MongoH\x00\x12\x30\n\x06pubsub\x18\r \x01(\x0b\x32\x1e.fennel.proto.connector.PubSubH\x00\x12,\n\x04http\x18\x0e \x01(\x0b\x32\x1c.fennel.proto.connector.HttpH\x00\x42\t\n\x07variant\"?\n\x10SamplingStrategy\x12\x15\n\rsampling_rate\x18\x01 \x01(\x01\x12\x14\n\x0c\x63olumns_used\x18\x02 \x03(\t\"M\n\x0cPubSubFormat\x12\x32\n\x04json\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.JsonFormatH\x00\x42\t\n\x07variant\"\xbc\x01\n\x0bKafkaFormat\x12\x32\n\x04json\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.JsonFormatH\x00\x12\x32\n\x04\x61vro\x18\x02 \x01(\x0b\x32\".fennel.proto.connector.AvroFormatH\x00\x12:\n\x08protobuf\x18\x03 \x01(\x0b\x32&.fennel.proto.connector.ProtobufFormatH\x00\x42\t\n\x07variant\"\x10\n\x0e\x44\x65\x62\x65ziumFormat\"\x0c\n\nJsonFormat\"S\n\nAvroFormat\x12\x45\n\x0fschema_registry\x18\x01 \x01(\x0b\x32,.fennel.proto.schema_registry.SchemaRegistry\"W\n\x0eProtobufFormat\x12\x45\n\x0fschema_registry\x18\x01 \x01(\x0b\x32,.fennel.proto.schema_registry.SchemaRegistry\"\xde\x01\n\tReference\x12;\n\x06\x64\x62type\x18\x01 \x01(\x0e\x32+.fennel.proto.connector.Reference.ExtDBType\"\x93\x01\n\tExtDBType\x12\t\n\x05MYSQL\x10\x00\x12\x0c\n\x08POSTGRES\x10\x01\x12\x06\n\x02S3\x10\x02\x12\t\n\x05KAFKA\x10\x03\x12\x0c\n\x08\x42IGQUERY\x10\x04\x12\r\n\tSNOWFLAKE\x10\x05\x12\x0b\n\x07WEBHOOK\x10\x06\x12\x0b\n\x07KINESIS\x10\x07\x12\x0c\n\x08REDSHIFT\x10\x08\x12\t\n\x05MONGO\x10\t\x12\n\n\x06PUBSUB\x10\n\"E\n\x07Webhook\x12\x0c\n\x04name\x18\x01 \x01(\t\x12,\n\tretention\x18\x02 \x01(\x0b\x32\x19.google.protobuf.Duration\"\x8c\x02\n\x05MySQL\x12\x0e\n\x04user\x18\x03 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x07 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x04 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04port\x18\x05 \x01(\r\x12\x13\n\x0bjdbc_params\x18\x06 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\x8f\x02\n\x08Postgres\x12\x0e\n\x04user\x18\x03 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x07 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x04 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04port\x18\x05 \x01(\r\x12\x13\n\x0bjdbc_params\x18\x06 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\xb0\x02\n\x02S3\x12\x1f\n\x15\x61ws_secret_access_key\x18\x01 \x01(\tH\x00\x12\x46\n\x1c\x61ws_secret_access_key_secret\x18\x04 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x1b\n\x11\x61ws_access_key_id\x18\x02 \x01(\tH\x01\x12\x42\n\x18\x61ws_access_key_id_secret\x18\x05 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x15\n\x08role_arn\x18\x03 \x01(\tH\x02\x88\x01\x01\x42\x1f\n\x1d\x61ws_secret_access_key_variantB\x1b\n\x19\x61ws_access_key_id_variantB\x0b\n\t_role_arn\"\xb6\x01\n\x08\x42igquery\x12\x1d\n\x13service_account_key\x18\x02 \x01(\tH\x00\x12\x44\n\x1aservice_account_key_secret\x18\x04 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\ndataset_id\x18\x01 \x01(\t\x12\x12\n\nproject_id\x18\x03 \x01(\tB\x1d\n\x1bservice_account_key_variant\"\xa1\x02\n\tSnowflake\x12\x0e\n\x04user\x18\x02 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x03 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\t \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0f\n\x07\x61\x63\x63ount\x18\x01 \x01(\t\x12\x0e\n\x06schema\x18\x04 \x01(\t\x12\x11\n\twarehouse\x18\x05 \x01(\t\x12\x0c\n\x04role\x18\x06 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x07 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\xf9\x02\n\x05Kafka\x12\x1d\n\x13sasl_plain_username\x18\x05 \x01(\tH\x00\x12>\n\x14sasl_username_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x1d\n\x13sasl_plain_password\x18\x06 \x01(\tH\x01\x12>\n\x14sasl_password_secret\x18\t \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x19\n\x11\x62ootstrap_servers\x18\x01 \x01(\t\x12\x19\n\x11security_protocol\x18\x02 \x01(\t\x12\x16\n\x0esasl_mechanism\x18\x03 \x01(\t\x12\x1c\n\x10sasl_jaas_config\x18\x04 \x01(\tB\x02\x18\x01\x12\x14\n\x08group_id\x18\x07 \x01(\tB\x02\x18\x01\x42\x17\n\x15sasl_username_variantB\x17\n\x15sasl_password_variant\"\x1b\n\x07Kinesis\x12\x10\n\x08role_arn\x18\x01 \x01(\t\"\xd3\x01\n\x0b\x43redentials\x12\x12\n\x08username\x18\x01 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x03 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x02 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x04 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x42\x12\n\x10username_variantB\x12\n\x10password_variant\"}\n\x16RedshiftAuthentication\x12\x1c\n\x12s3_access_role_arn\x18\x01 \x01(\tH\x00\x12:\n\x0b\x63redentials\x18\x02 \x01(\x0b\x32#.fennel.proto.connector.CredentialsH\x00\x42\t\n\x07variant\"\x99\x01\n\x08Redshift\x12\x10\n\x08\x64\x61tabase\x18\x01 \x01(\t\x12\x0c\n\x04host\x18\x02 \x01(\t\x12\x0c\n\x04port\x18\x03 \x01(\r\x12\x0e\n\x06schema\x18\x04 \x01(\t\x12O\n\x17redshift_authentication\x18\x05 \x01(\x0b\x32..fennel.proto.connector.RedshiftAuthentication\"\xe9\x01\n\x05Mongo\x12\x0e\n\x04user\x18\x03 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x05 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x04 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x06 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\xa0\x01\n\x06PubSub\x12\x1d\n\x13service_account_key\x18\x02 \x01(\tH\x00\x12\x44\n\x1aservice_account_key_secret\x18\x03 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\nproject_id\x18\x01 \x01(\tB\x1d\n\x1bservice_account_key_variant\"s\n\x0eSensitiveDatum\x12\x10\n\x06secret\x18\x01 \x01(\tH\x00\x12\x34\n\nsecret_ref\x18\x02 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x42\x19\n\x17sensitive_datum_variant\"\xb8\x01\n\x04Http\x12\x0e\n\x04host\x18\x01 \x01(\tH\x00\x12\x35\n\x0bhost_secret\x18\x02 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x0f\n\x07healthz\x18\x03 \x01(\t\x12<\n\x07\x63\x61_cert\x18\x04 \x01(\x0b\x32&.fennel.proto.connector.SensitiveDatumH\x01\x88\x01\x01\x42\x0e\n\x0chost_variantB\n\n\x08_ca_cert\"\xf7\x05\n\x08\x45xtTable\x12\x39\n\x0bmysql_table\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.MySQLTableH\x00\x12\x39\n\x08pg_table\x18\x02 \x01(\x0b\x32%.fennel.proto.connector.PostgresTableH\x00\x12\x33\n\x08s3_table\x18\x03 \x01(\x0b\x32\x1f.fennel.proto.connector.S3TableH\x00\x12\x39\n\x0bkafka_topic\x18\x04 \x01(\x0b\x32\".fennel.proto.connector.KafkaTopicH\x00\x12\x41\n\x0fsnowflake_table\x18\x05 \x01(\x0b\x32&.fennel.proto.connector.SnowflakeTableH\x00\x12?\n\x0e\x62igquery_table\x18\x06 \x01(\x0b\x32%.fennel.proto.connector.BigqueryTableH\x00\x12;\n\x08\x65ndpoint\x18\x07 \x01(\x0b\x32\'.fennel.proto.connector.WebhookEndpointH\x00\x12?\n\x0ekinesis_stream\x18\x08 \x01(\x0b\x32%.fennel.proto.connector.KinesisStreamH\x00\x12?\n\x0eredshift_table\x18\t \x01(\x0b\x32%.fennel.proto.connector.RedshiftTableH\x00\x12\x43\n\x10mongo_collection\x18\n \x01(\x0b\x32\'.fennel.proto.connector.MongoCollectionH\x00\x12;\n\x0cpubsub_topic\x18\x0b \x01(\x0b\x32#.fennel.proto.connector.PubSubTopicH\x00\x12\x35\n\thttp_path\x18\x0c \x01(\x0b\x32 .fennel.proto.connector.HttpPathH\x00\x42\t\n\x07variant\"Q\n\nMySQLTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"z\n\rPostgresTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\x12\x16\n\tslot_name\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x0c\n\n_slot_name\"\xf7\x01\n\x07S3Table\x12\x0e\n\x06\x62ucket\x18\x01 \x01(\t\x12\x13\n\x0bpath_prefix\x18\x02 \x01(\t\x12\x11\n\tdelimiter\x18\x04 \x01(\t\x12\x0e\n\x06\x66ormat\x18\x05 \x01(\t\x12/\n\x02\x64\x62\x18\x06 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\npre_sorted\x18\x07 \x01(\x08\x12\x13\n\x0bpath_suffix\x18\x08 \x01(\t\x12.\n\x06spread\x18\x03 \x01(\x0b\x32\x19.google.protobuf.DurationH\x00\x88\x01\x01\x12\x0f\n\x07headers\x18\t \x03(\tB\t\n\x07_spread\"\x81\x01\n\nKafkaTopic\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\r\n\x05topic\x18\x02 \x01(\t\x12\x33\n\x06\x66ormat\x18\x03 \x01(\x0b\x32#.fennel.proto.connector.KafkaFormat\"T\n\rBigqueryTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"U\n\x0eSnowflakeTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"\x81\x01\n\x0fWebhookEndpoint\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08\x65ndpoint\x18\x02 \x01(\t\x12+\n\x08\x64uration\x18\x03 \x01(\x0b\x32\x19.google.protobuf.Duration\"\xd3\x01\n\rKinesisStream\x12\x12\n\nstream_arn\x18\x01 \x01(\t\x12\x39\n\rinit_position\x18\x02 \x01(\x0e\x32\".fennel.proto.kinesis.InitPosition\x12\x32\n\x0einit_timestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0e\n\x06\x66ormat\x18\x04 \x01(\t\x12/\n\x02\x64\x62\x18\x05 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\"T\n\rRedshiftTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"\x86\x01\n\x0bPubSubTopic\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08topic_id\x18\x02 \x01(\t\x12\x34\n\x06\x66ormat\x18\x03 \x01(\x0b\x32$.fennel.proto.connector.PubSubFormat\"\xdb\x01\n\x08HttpPath\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08\x65ndpoint\x18\x02 \x01(\t\x12\x12\n\x05limit\x18\x03 \x01(\rH\x00\x88\x01\x01\x12>\n\x07headers\x18\x04 \x03(\x0b\x32-.fennel.proto.connector.HttpPath.HeadersEntry\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x08\n\x06_limit\"\x9e\x01\n\x04\x45val\x12+\n\x06schema\x18\x01 \x01(\x0b\x32\x1b.fennel.proto.schema.Schema\x12-\n\x04\x65xpr\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.expression.ExprH\x00\x12-\n\x06pycode\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCodeH\x00\x42\x0b\n\teval_type\"\x83\x01\n\x0cPreProcValue\x12\r\n\x03ref\x18\x01 \x01(\tH\x00\x12+\n\x05value\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.schema.ValueH\x00\x12,\n\x04\x65val\x18\x03 \x01(\x0b\x32\x1c.fennel.proto.connector.EvalH\x00\x42\t\n\x07variant\"[\n\x0fMongoCollection\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x17\n\x0f\x63ollection_name\x18\x02 \x01(\t\"2\n\x0cSnapshotData\x12\x0e\n\x06marker\x18\x01 \x01(\t\x12\x12\n\nnum_retain\x18\x02 \x01(\r\"\r\n\x0bIncremental\"\n\n\x08Recreate\"\xbc\x01\n\x05Style\x12:\n\x0bincremental\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.IncrementalH\x00\x12\x34\n\x08recreate\x18\x02 \x01(\x0b\x32 .fennel.proto.connector.RecreateH\x00\x12\x38\n\x08snapshot\x18\x03 \x01(\x0b\x32$.fennel.proto.connector.SnapshotDataH\x00\x42\x07\n\x05Style\"\xa5\x07\n\x06Source\x12/\n\x05table\x18\x01 \x01(\x0b\x32 .fennel.proto.connector.ExtTable\x12\x0f\n\x07\x64\x61taset\x18\x02 \x01(\t\x12\x12\n\nds_version\x18\x03 \x01(\r\x12(\n\x05\x65very\x18\x04 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x13\n\x06\x63ursor\x18\x05 \x01(\tH\x00\x88\x01\x01\x12+\n\x08\x64isorder\x18\x06 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x17\n\x0ftimestamp_field\x18\x07 \x01(\t\x12\x30\n\x03\x63\x64\x63\x18\x08 \x01(\x0e\x32#.fennel.proto.connector.CDCStrategy\x12\x31\n\rstarting_from\x18\t \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12=\n\x08pre_proc\x18\n \x03(\x0b\x32+.fennel.proto.connector.Source.PreProcEntry\x12\x0f\n\x07version\x18\x0b \x01(\r\x12\x0f\n\x07\x62ounded\x18\x0c \x01(\x08\x12\x30\n\x08idleness\x18\r \x01(\x0b\x32\x19.google.protobuf.DurationH\x01\x88\x01\x01\x12)\n\x05until\x18\x0e \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x30\n\x06\x66ilter\x18\x0f \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCodeH\x02\x88\x01\x01\x12H\n\x11sampling_strategy\x18\x10 \x01(\x0b\x32(.fennel.proto.connector.SamplingStrategyH\x03\x88\x01\x01\x12\x37\n\x0b\x66ilter_expr\x18\x11 \x01(\x0b\x32\x1d.fennel.proto.expression.ExprH\x04\x88\x01\x01\x12\x37\n\rfilter_schema\x18\x12 \x01(\x0b\x32\x1b.fennel.proto.schema.SchemaH\x05\x88\x01\x01\x1aT\n\x0cPreProcEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x33\n\x05value\x18\x02 \x01(\x0b\x32$.fennel.proto.connector.PreProcValue:\x02\x38\x01\x42\t\n\x07_cursorB\x0b\n\t_idlenessB\t\n\x07_filterB\x14\n\x12_sampling_strategyB\x0e\n\x0c_filter_exprB\x10\n\x0e_filter_schema\"\x90\x04\n\x04Sink\x12/\n\x05table\x18\x01 \x01(\x0b\x32 .fennel.proto.connector.ExtTable\x12\x0f\n\x07\x64\x61taset\x18\x02 \x01(\t\x12\x12\n\nds_version\x18\x03 \x01(\r\x12\x35\n\x03\x63\x64\x63\x18\x04 \x01(\x0e\x32#.fennel.proto.connector.CDCStrategyH\x00\x88\x01\x01\x12(\n\x05\x65very\x18\x05 \x01(\x0b\x32\x19.google.protobuf.Duration\x12/\n\x03how\x18\x06 \x01(\x0b\x32\x1d.fennel.proto.connector.StyleH\x01\x88\x01\x01\x12\x0e\n\x06\x63reate\x18\x07 \x01(\x08\x12:\n\x07renames\x18\x08 \x03(\x0b\x32).fennel.proto.connector.Sink.RenamesEntry\x12.\n\x05since\x18\t \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x02\x88\x01\x01\x12.\n\x05until\x18\n \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x03\x88\x01\x01\x12\x14\n\x07stacked\x18\x0b \x01(\x08H\x04\x88\x01\x01\x1a.\n\x0cRenamesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x06\n\x04_cdcB\x06\n\x04_howB\x08\n\x06_sinceB\x08\n\x06_untilB\n\n\x08_stacked*K\n\x0b\x43\x44\x43Strategy\x12\n\n\x06\x41ppend\x10\x00\x12\n\n\x06Upsert\x10\x01\x12\x0c\n\x08\x44\x65\x62\x65zium\x10\x02\x12\n\n\x06Native\x10\x03\x12\n\n\x06\x44\x65lete\x10\x04\x62\x06proto3')
 
 _globals = globals()
 _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@@ -39,8 +39,8 @@
   _globals['_SOURCE_PREPROCENTRY']._serialized_options = b'8\001'
   _globals['_SINK_RENAMESENTRY']._loaded_options = None
   _globals['_SINK_RENAMESENTRY']._serialized_options = b'8\001'
-  _globals['_CDCSTRATEGY']._serialized_start=9088
-  _globals['_CDCSTRATEGY']._serialized_end=9163
+  _globals['_CDCSTRATEGY']._serialized_start=9122
+  _globals['_CDCSTRATEGY']._serialized_end=9197
   _globals['_EXTDATABASE']._serialized_start=207
   _globals['_EXTDATABASE']._serialized_end=905
   _globals['_SAMPLINGSTRATEGY']._serialized_start=907
@@ -136,7 +136,7 @@
   _globals['_SOURCE_PREPROCENTRY']._serialized_start=8414
   _globals['_SOURCE_PREPROCENTRY']._serialized_end=8498
   _globals['_SINK']._serialized_start=8592
-  _globals['_SINK']._serialized_end=9086
-  _globals['_SINK_RENAMESENTRY']._serialized_start=9004
-  _globals['_SINK_RENAMESENTRY']._serialized_end=9050
+  _globals['_SINK']._serialized_end=9120
+  _globals['_SINK_RENAMESENTRY']._serialized_start=9026
+  _globals['_SINK_RENAMESENTRY']._serialized_end=9072
 # @@protoc_insertion_point(module_scope)
diff --git a/fennel/gen/connector_pb2.pyi b/fennel/gen/connector_pb2.pyi
index afb8fe07..763b39aa 100644
--- a/fennel/gen/connector_pb2.pyi
+++ b/fennel/gen/connector_pb2.pyi
@@ -1420,6 +1420,7 @@ class Sink(google.protobuf.message.Message):
     RENAMES_FIELD_NUMBER: builtins.int
     SINCE_FIELD_NUMBER: builtins.int
     UNTIL_FIELD_NUMBER: builtins.int
+    STACKED_FIELD_NUMBER: builtins.int
     @property
     def table(self) -> global___ExtTable: ...
     dataset: builtins.str
@@ -1436,6 +1437,7 @@ class Sink(google.protobuf.message.Message):
     def since(self) -> google.protobuf.timestamp_pb2.Timestamp: ...
     @property
     def until(self) -> google.protobuf.timestamp_pb2.Timestamp: ...
+    stacked: builtins.bool
     def __init__(
         self,
         *,
@@ -1449,9 +1451,10 @@ class Sink(google.protobuf.message.Message):
         renames: collections.abc.Mapping[builtins.str, builtins.str] | None = ...,
         since: google.protobuf.timestamp_pb2.Timestamp | None = ...,
         until: google.protobuf.timestamp_pb2.Timestamp | None = ...,
+        stacked: builtins.bool | None = ...,
     ) -> None: ...
-    def HasField(self, field_name: typing_extensions.Literal["_cdc", b"_cdc", "_how", b"_how", "_since", b"_since", "_until", b"_until", "cdc", b"cdc", "every", b"every", "how", b"how", "since", b"since", "table", b"table", "until", b"until"]) -> builtins.bool: ...
-    def ClearField(self, field_name: typing_extensions.Literal["_cdc", b"_cdc", "_how", b"_how", "_since", b"_since", "_until", b"_until", "cdc", b"cdc", "create", b"create", "dataset", b"dataset", "ds_version", b"ds_version", "every", b"every", "how", b"how", "renames", b"renames", "since", b"since", "table", b"table", "until", b"until"]) -> None: ...
+    def HasField(self, field_name: typing_extensions.Literal["_cdc", b"_cdc", "_how", b"_how", "_since", b"_since", "_stacked", b"_stacked", "_until", b"_until", "cdc", b"cdc", "every", b"every", "how", b"how", "since", b"since", "stacked", b"stacked", "table", b"table", "until", b"until"]) -> builtins.bool: ...
+    def ClearField(self, field_name: typing_extensions.Literal["_cdc", b"_cdc", "_how", b"_how", "_since", b"_since", "_stacked", b"_stacked", "_until", b"_until", "cdc", b"cdc", "create", b"create", "dataset", b"dataset", "ds_version", b"ds_version", "every", b"every", "how", b"how", "renames", b"renames", "since", b"since", "stacked", b"stacked", "table", b"table", "until", b"until"]) -> None: ...
     @typing.overload
     def WhichOneof(self, oneof_group: typing_extensions.Literal["_cdc", b"_cdc"]) -> typing_extensions.Literal["cdc"] | None: ...
     @typing.overload
@@ -1459,6 +1462,8 @@ class Sink(google.protobuf.message.Message):
     @typing.overload
     def WhichOneof(self, oneof_group: typing_extensions.Literal["_since", b"_since"]) -> typing_extensions.Literal["since"] | None: ...
     @typing.overload
+    def WhichOneof(self, oneof_group: typing_extensions.Literal["_stacked", b"_stacked"]) -> typing_extensions.Literal["stacked"] | None: ...
+    @typing.overload
     def WhichOneof(self, oneof_group: typing_extensions.Literal["_until", b"_until"]) -> typing_extensions.Literal["until"] | None: ...
 
 global___Sink = Sink
diff --git a/fennel/internal_lib/to_proto/to_proto.py b/fennel/internal_lib/to_proto/to_proto.py
index c9b68e6c..9296c03d 100644
--- a/fennel/internal_lib/to_proto/to_proto.py
+++ b/fennel/internal_lib/to_proto/to_proto.py
@@ -178,9 +178,12 @@ def to_sync_request_proto(
                     external_dbs.append(ext_db)
 
             db_with_sinks = sinks_from_ds(obj, is_source_dataset, env)
+            num_stacked_sinks = 0
 
-            for ext_db, sinks in db_with_sinks:
-                conn_sinks.append(sinks)
+            for ext_db, sink in db_with_sinks:
+                if sink.stacked:
+                    num_stacked_sinks += 1
+                conn_sinks.append(sink)
                 # dedup external dbs by the name
                 # TODO(mohit): Also validate that if the name is the same, there should
                 # be no difference in the other fields
@@ -188,6 +191,25 @@ def to_sync_request_proto(
                     external_dbs_by_name[ext_db.name] = ext_db
                     external_dbs.append(ext_db)
 
+            # Perform validations for stacked sinks
+            if len(db_with_sinks) == 1 and num_stacked_sinks == 1:
+                raise ValueError(
+                    f"Stacked sink not supported when there is only one sink added on top of dataset: {obj._name}"
+                )
+
+            if len(db_with_sinks) > 1 and num_stacked_sinks == 0:
+                raise ValueError(
+                    f"Add the new sinks as stacked on top of dataset: {obj._name}"
+                )
+
+            if (
+                len(db_with_sinks) > 1
+                and num_stacked_sinks != len(db_with_sinks) - 1
+            ):
+                raise ValueError(
+                    f"Expected {len(db_with_sinks) - 1} stacked sinks on top of dataset: {obj._name} but found {num_stacked_sinks}"
+                )
+
         elif isinstance(obj, Featureset):
             featuresets.append(featureset_to_proto(obj))
             features.extend(features_from_fs(obj))
@@ -1130,6 +1152,7 @@ def _kafka_conn_to_sink_proto(
         renames=_renames_to_proto(connector.renames),
         since=_to_timestamp_proto(connector.since),
         until=_to_timestamp_proto(connector.until),
+        stacked=connector.stacked,
     )
     return ext_db, sink
 
@@ -1190,6 +1213,7 @@ def _http_conn_to_sink_proto(
         renames=_renames_to_proto(connector.renames),
         since=_to_timestamp_proto(connector.since),
         until=_to_timestamp_proto(connector.until),
+        stacked=connector.stacked,
     )
     return ext_db, sink
 
@@ -1423,6 +1447,7 @@ def _s3_conn_to_sink_proto(
         renames=_renames_to_proto(connector.renames),
         since=_to_timestamp_proto(connector.since),
         until=_to_timestamp_proto(connector.until),
+        stacked=connector.stacked,
     )
     return ext_db, sink
 
@@ -1891,6 +1916,7 @@ def _snowflake_conn_to_sink_proto(
         renames=_renames_to_proto(connector.renames),
         since=_to_timestamp_proto(connector.since),
         until=_to_timestamp_proto(connector.until),
+        stacked=connector.stacked,
     )
     return ext_db, sink
 
diff --git a/pyproject.toml b/pyproject.toml
index 000ed2d3..3111cc18 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
 [tool.poetry]
 name = "fennel-ai"
-version = "1.5.58"
+version = "1.5.59"
 description = "The modern realtime feature engineering platform"
 authors = ["Fennel AI "]
 packages = [{ include = "fennel" }]