From 130d2403da4444cf7403d041e129c94a27c8b297 Mon Sep 17 00:00:00 2001 From: Nitin Bansal Date: Tue, 10 Dec 2024 16:58:54 +0530 Subject: [PATCH] aggregation: allow specifying none as default in aggregations --- .../api-reference/aggregations/lastk.py | 2 + .../api-reference/aggregations/average.md | 18 +-- .../aggregations/exponential-decay-sum.md | 6 +- .../pages/api-reference/aggregations/lastk.md | 5 + docs/pages/api-reference/aggregations/max.md | 13 +- docs/pages/api-reference/aggregations/min.md | 13 +- .../api-reference/aggregations/quantile.md | 9 +- .../api-reference/aggregations/stddev.md | 17 ++- docs/pages/api-reference/aggregations/sum.md | 9 +- fennel/CHANGELOG.md | 3 + .../client_tests/test_complex_aggregation.py | 127 ++++++++++++++++++ fennel/client_tests/test_date_type.py | 8 +- fennel/datasets/aggregate.py | 70 +++++++--- fennel/datasets/datasets.py | 87 +++++++++--- fennel/datasets/test_schema_validator.py | 2 +- fennel/gen/connector_pb2.py | 12 +- fennel/gen/connector_pb2.pyi | 9 +- fennel/gen/spec_pb2.py | 40 +++--- fennel/gen/spec_pb2.pyi | 20 ++- fennel/internal_lib/utils/utils.py | 28 +++- fennel/testing/execute_aggregation.py | 10 +- fennel/testing/executor.py | 14 +- pyproject.toml | 2 +- 23 files changed, 399 insertions(+), 125 deletions(-) diff --git a/docs/examples/api-reference/aggregations/lastk.py b/docs/examples/api-reference/aggregations/lastk.py index 6a186b4eb..cbb632468 100644 --- a/docs/examples/api-reference/aggregations/lastk.py +++ b/docs/examples/api-reference/aggregations/lastk.py @@ -41,6 +41,7 @@ def lastk_pipeline(cls, ds: Dataset): of="amount", limit=10, dedup=False, + dropnull=False, window=Continuous("1d"), ), # docsnip-highlight end @@ -142,6 +143,7 @@ def bad_pipeline(cls, ds: Dataset): of="amount", limit=10, dedup=False, + dropnull=False, window=Continuous("1d"), ), # docsnip-highlight end diff --git a/docs/pages/api-reference/aggregations/average.md b/docs/pages/api-reference/aggregations/average.md index c9328b103..5fbb23dff 100644 --- a/docs/pages/api-reference/aggregations/average.md +++ b/docs/pages/api-reference/aggregations/average.md @@ -22,9 +22,10 @@ The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type `float`. - + Average over an empty set of rows isn't well defined - Fennel returns `default` -in such cases. +in such cases. If the default is not set or is None, Fennel returns None and +in that case, the expected type of `into_field` must be `Optional[float]`.
 
 #### Returns
-
+
 Stores the result of the aggregation in the appropriate field of the output 
 dataset. If there are no rows in the aggregation window, `default` is used.
 
 
 
 #### Errors
-
-The input column denoted by `of` must either be of `int` or `float` types. 
+
+The input column denoted by `of` must either be of `int` or `float` or 
+`decimal` types.
 
-Note that unlike SQL, even aggregations over `Optional[int]` or `Optional[float]` 
-aren't allowed.
+Note that like SQL, aggregations over `Optional[int]` or `Optional[float]` 
+are allowed.
 
 
 
@@ -52,7 +54,7 @@ The type of the field denoted by `into_field` in the output dataset and that of
 
 
 
+    message="Can not take average over string, only int or float or decimal">
 
diff --git a/docs/pages/api-reference/aggregations/exponential-decay-sum.md b/docs/pages/api-reference/aggregations/exponential-decay-sum.md
index fd61e5403..6c6be0091 100644
--- a/docs/pages/api-reference/aggregations/exponential-decay-sum.md
+++ b/docs/pages/api-reference/aggregations/exponential-decay-sum.md
@@ -9,7 +9,7 @@ Aggregation to compute a rolling exponential decay for each group within a windo
 #### Parameters
 
 Name of the field in the input dataset over which the decayed sum should be computed. 
-This field can only either be `int` or `float.
+This field can only either be `int` or `float` or `decimal`.
 
 
 
@@ -46,8 +46,8 @@ are no rows to count, by default, it returns 0.0
 The input column denoted by `of` must either be of `int` or `float` types. 
 The output field denoted by `into_field` must always be of type `float`.
 
-Note that unlike SQL, even aggregations over `Optional[int]` or `Optional[float]` 
-aren't allowed.
+Note that like SQL, aggregations over `Optional[int]` or `Optional[float]` 
+are allowed.
 
 
 
 
+
+If set to True, None values are dropped from the result. It expects `of` field
+to be of type `Optional[T]` and `into_field` gets the type `List[T]`.
+
+
 
 
diff --git a/docs/pages/api-reference/aggregations/max.md b/docs/pages/api-reference/aggregations/max.md index 2c3c2bafd..edb7f7dcf 100644 --- a/docs/pages/api-reference/aggregations/max.md +++ b/docs/pages/api-reference/aggregations/max.md @@ -24,10 +24,11 @@ aggregation. This field is expected to be of type `int`, `float`, `date` or `of`. - + Max over an empty set of rows isn't well defined - Fennel returns `default` in such cases. The type of `default` must be same as that of `of` in the input -dataset. +dataset. If the default is not set or is None, Fennel returns None and in that case, +the expected type of `into_field` must be `Optional[T]`.
-The input column denoted by `of` must be of `int`, `float`, `date` or `datetime`
-types. 
+The input column denoted by `of` must be of `int`, `float`, `decimal`, 
+`date` or `datetime` types. 
 
-Note that unlike SQL, even aggregations over `Optional[int]` or `Optional[float]` 
-aren't allowed.
+Note that like SQL, aggregations over `Optional[int]` or `Optional[float]` 
+are allowed.
 
 
 
diff --git a/docs/pages/api-reference/aggregations/min.md b/docs/pages/api-reference/aggregations/min.md
index 5f001ec79..98cd34304 100644
--- a/docs/pages/api-reference/aggregations/min.md
+++ b/docs/pages/api-reference/aggregations/min.md
@@ -24,10 +24,11 @@ aggregation. This field is expected to be of type `int`, `float`, `date` or
 `of`.
 
 
-
+
 Min over an empty set of rows isn't well defined - Fennel returns `default`
 in such cases. The type of `default` must be same as that of `of` in the input
-dataset.
+dataset. If the default is not set or is None, Fennel returns None and in that case, 
+the expected type of `into_field` must be `Optional[T]`.
 
 
 
-The input column denoted by `of` must be of `int`, `float`, `date` or `datetime`
-types. 
+The input column denoted by `of` must be of `int`, `float`, `decimal`, 
+`date` or `datetime` types. 
 
-Note that unlike SQL, even aggregations over `Optional[int]` or `Optional[float]` 
-aren't allowed.
+Note that like SQL, aggregations over `Optional[int]` or `Optional[float]` 
+are allowed.
 
 
 
diff --git a/docs/pages/api-reference/aggregations/quantile.md b/docs/pages/api-reference/aggregations/quantile.md
index 6901eb777..2391c7982 100644
--- a/docs/pages/api-reference/aggregations/quantile.md
+++ b/docs/pages/api-reference/aggregations/quantile.md
@@ -58,10 +58,11 @@ dataset. If there are no rows in the aggregation window, `default` is used.
 
 #### Errors
 
-The input column denoted by `of` must either be of `int` or `float` types. 
+The input column denoted by `of` must either be of `int` or `float` or 
+`decimal` types.
 
-Note that unlike SQL, even aggregations over `Optional[int]` or `Optional[float]` 
-aren't allowed.
+Note that like SQL, aggregations over `Optional[int]` or `Optional[float]` 
+are allowed.
 
 
 
@@ -81,7 +82,7 @@ right expectations and be compatible with future addition of exact quantiles.
 
 
 
+    message="Can not take quantile over string, only int or float or decimal">
 
diff --git a/docs/pages/api-reference/aggregations/stddev.md b/docs/pages/api-reference/aggregations/stddev.md
index 7fe667981..375d8c6ef 100644
--- a/docs/pages/api-reference/aggregations/stddev.md
+++ b/docs/pages/api-reference/aggregations/stddev.md
@@ -22,9 +22,11 @@ The name of the field in the output dataset that should store the result of this
 aggregation. This field is expected to be of type `float`.
 
 
-
+
 Standard deviation over an empty set of rows isn't well defined - Fennel 
-returns `default` in such cases.
+returns `default` in such cases. If the default is not set or is None, 
+Fennel returns None and in that case, the expected type of `into_field` 
+must be `Optional[float]`.
 
 
 
 
 #### Returns
-
+
 Stores the result of the aggregation in the appropriate field of the output 
 dataset. If there are no rows in the aggregation window, `default` is used.
 
@@ -40,10 +42,11 @@ dataset. If there are no rows in the aggregation window, `default` is used.
 
 #### Errors
 
-The input column denoted by `of` must either be of `int` or `float` types. 
+The input column denoted by `of` must either be of `int` or `float` or 
+`decimal` types.
 
-Note that unlike SQL, even aggregations over `Optional[int]` or `Optional[float]` 
-aren't allowed.
+Note that like SQL, aggregations over `Optional[int]` or `Optional[float]` 
+are allowed.
 
 
 
@@ -52,7 +55,7 @@ The type of the field denoted by `into_field` in the output dataset and that of
 
 
 
+    message="Can not take stddev over string, only int or float or decimal">
 
 
 #### Returns
-
+
 Accumulates the count in the appropriate field of the output dataset. If there 
 are no rows to count, by default, it returns 0 (or 0.0 if `of` is float).
 
@@ -36,10 +36,11 @@ are no rows to count, by default, it returns 0 (or 0.0 if `of` is float).
 
 #### Errors
 
-The input column denoted by `of` must either be of `int` or `float` types. 
+The input column denoted by `of` must either be of `int` or `float` 
+or `decimal` types.
 
-Note that unlike SQL, even aggregations over `Optional[int]` or `Optional[float]` 
-aren't allowed.
+Note that like SQL, aggregations over `Optional[int]` or `Optional[float]` 
+are allowed.
 
 
 
 DSSchema:
                     raise TypeError(
                         f"Cannot take average of field `{agg.of}` of type `{dtype_to_string(dtype)}`"
                     )
-                values[agg.into_field] = pd.Float64Dtype  # type: ignore
+                if agg.default is None:
+                    values[agg.into_field] = Optional[pd.Float64Dtype]  # type: ignore
+                else:
+                    values[agg.into_field] = pd.Float64Dtype  # type: ignore
             elif isinstance(agg, LastK):
                 dtype = input_schema.get_type(agg.of)
                 if agg.dropnull:
@@ -2831,15 +2835,37 @@ def visitAggregate(self, obj) -> DSSchema:
                 ]
                 if primtive_dtype not in allowed_types:
                     raise TypeError(
-                        f"invalid min: type of field `{agg.of}` is not int, float, date or datetime"
+                        f"invalid min: type of field `{agg.of}` is not int, float, decimal, date or datetime"
                     )
-                if primtive_dtype == pd.Int64Dtype and (
-                    int(agg.default) != agg.default  # type: ignore
-                ):
-                    raise TypeError(
-                        f"invalid min: default value `{agg.default}` not of type `int`"
-                    )
-                values[agg.into_field] = fennel_get_optional_inner(dtype)  # type: ignore
+                if agg.default is not None:
+                    if primtive_dtype == pd.Int64Dtype and (
+                        int(agg.default) != agg.default  # type: ignore
+                    ):
+                        raise TypeError(
+                            f"invalid min: default value `{agg.default}` not of type `int`"
+                        )
+                    if isinstance(primtive_dtype, Decimal) and not isinstance(
+                        agg.default, PythonDecimal
+                    ):
+                        raise TypeError(
+                            f"invalid min: default value `{agg.default}` not of type `Decimal`"
+                        )
+                    if primtive_dtype == datetime.date and not isinstance(
+                        agg.default, datetime.date
+                    ):
+                        raise TypeError(
+                            f"invalid min: default value `{agg.default}` not of type `date`"
+                        )
+                    if primtive_dtype == datetime.datetime and not isinstance(
+                        agg.default, datetime.datetime
+                    ):
+                        raise TypeError(
+                            f"invalid min: default value `{agg.default}` not of type `datetime`"
+                        )
+                if agg.default is None:
+                    values[agg.into_field] = Optional[fennel_get_optional_inner(dtype)]  # type: ignore
+                else:
+                    values[agg.into_field] = fennel_get_optional_inner(dtype)  # type: ignore
             elif isinstance(agg, Max):
                 dtype = input_schema.get_type(agg.of)
                 primtive_dtype = get_primitive_dtype_with_optional(dtype)
@@ -2849,15 +2875,37 @@ def visitAggregate(self, obj) -> DSSchema:
                 ]
                 if primtive_dtype not in allowed_types:
                     raise TypeError(
-                        f"invalid max: type of field `{agg.of}` is not int, float, date or datetime"
+                        f"invalid max: type of field `{agg.of}` is not int, float, decimal, date or datetime"
                     )
-                if primtive_dtype == pd.Int64Dtype and (
-                    int(agg.default) != agg.default  # type: ignore
-                ):
-                    raise TypeError(
-                        f"invalid max: default value `{agg.default}` not of type `int`"
-                    )
-                values[agg.into_field] = fennel_get_optional_inner(dtype)  # type: ignore
+                if agg.default is not None:
+                    if primtive_dtype == pd.Int64Dtype and (
+                        int(agg.default) != agg.default  # type: ignore
+                    ):
+                        raise TypeError(
+                            f"invalid max: default value `{agg.default}` not of type `int`"
+                        )
+                    if isinstance(primtive_dtype, Decimal) and not isinstance(
+                        agg.default, PythonDecimal
+                    ):
+                        raise TypeError(
+                            f"invalid max: default value `{agg.default}` not of type `Decimal`"
+                        )
+                    if primtive_dtype == datetime.date and not isinstance(
+                        agg.default, datetime.date
+                    ):
+                        raise TypeError(
+                            f"invalid max: default value `{agg.default}` not of type `date`"
+                        )
+                    if primtive_dtype == datetime.datetime and not isinstance(
+                        agg.default, datetime.datetime
+                    ):
+                        raise TypeError(
+                            f"invalid max: default value `{agg.default}` not of type `datetime`"
+                        )
+                if agg.default is None:
+                    values[agg.into_field] = Optional[fennel_get_optional_inner(dtype)]  # type: ignore
+                else:
+                    values[agg.into_field] = fennel_get_optional_inner(dtype)  # type: ignore
             elif isinstance(agg, Stddev):
                 dtype = input_schema.get_type(agg.of)
                 if (
@@ -2867,7 +2915,10 @@ def visitAggregate(self, obj) -> DSSchema:
                     raise TypeError(
                         f"Cannot get standard deviation of field {agg.of} of type {dtype_to_string(dtype)}"
                     )
-                values[agg.into_field] = pd.Float64Dtype  # type: ignore
+                if agg.default is None:
+                    values[agg.into_field] = Optional[pd.Float64Dtype]  # type: ignore
+                else:
+                    values[agg.into_field] = pd.Float64Dtype  # type: ignore
             elif isinstance(agg, Quantile):
                 dtype = input_schema.get_type(agg.of)
                 if (
diff --git a/fennel/datasets/test_schema_validator.py b/fennel/datasets/test_schema_validator.py
index 265f689dd..70568f76c 100644
--- a/fennel/datasets/test_schema_validator.py
+++ b/fennel/datasets/test_schema_validator.py
@@ -460,7 +460,7 @@ def pipeline(cls, a: Dataset):
 
     assert (
         str(e.value)
-        == """invalid max: type of field `b` is not int, float, date or datetime"""
+        == """invalid max: type of field `b` is not int, float, decimal, date or datetime"""
     )
 
 
diff --git a/fennel/gen/connector_pb2.py b/fennel/gen/connector_pb2.py
index 33f1b072a..373917836 100644
--- a/fennel/gen/connector_pb2.py
+++ b/fennel/gen/connector_pb2.py
@@ -22,7 +22,7 @@
 import fennel.gen.secret_pb2 as secret__pb2
 
 
-DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0f\x63onnector.proto\x12\x16\x66\x65nnel.proto.connector\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x10\x65xpression.proto\x1a\rkinesis.proto\x1a\x0cpycode.proto\x1a\x15schema_registry.proto\x1a\x0cschema.proto\x1a\x0csecret.proto\"\xba\x05\n\x0b\x45xtDatabase\x12\x0c\n\x04name\x18\x01 \x01(\t\x12.\n\x05mysql\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.connector.MySQLH\x00\x12\x34\n\x08postgres\x18\x03 \x01(\x0b\x32 .fennel.proto.connector.PostgresH\x00\x12\x36\n\treference\x18\x04 \x01(\x0b\x32!.fennel.proto.connector.ReferenceH\x00\x12(\n\x02s3\x18\x05 \x01(\x0b\x32\x1a.fennel.proto.connector.S3H\x00\x12\x34\n\x08\x62igquery\x18\x06 \x01(\x0b\x32 .fennel.proto.connector.BigqueryH\x00\x12\x36\n\tsnowflake\x18\x07 \x01(\x0b\x32!.fennel.proto.connector.SnowflakeH\x00\x12.\n\x05kafka\x18\x08 \x01(\x0b\x32\x1d.fennel.proto.connector.KafkaH\x00\x12\x32\n\x07webhook\x18\t \x01(\x0b\x32\x1f.fennel.proto.connector.WebhookH\x00\x12\x32\n\x07kinesis\x18\n \x01(\x0b\x32\x1f.fennel.proto.connector.KinesisH\x00\x12\x34\n\x08redshift\x18\x0b \x01(\x0b\x32 .fennel.proto.connector.RedshiftH\x00\x12.\n\x05mongo\x18\x0c \x01(\x0b\x32\x1d.fennel.proto.connector.MongoH\x00\x12\x30\n\x06pubsub\x18\r \x01(\x0b\x32\x1e.fennel.proto.connector.PubSubH\x00\x12,\n\x04http\x18\x0e \x01(\x0b\x32\x1c.fennel.proto.connector.HttpH\x00\x42\t\n\x07variant\"?\n\x10SamplingStrategy\x12\x15\n\rsampling_rate\x18\x01 \x01(\x01\x12\x14\n\x0c\x63olumns_used\x18\x02 \x03(\t\"M\n\x0cPubSubFormat\x12\x32\n\x04json\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.JsonFormatH\x00\x42\t\n\x07variant\"\xbc\x01\n\x0bKafkaFormat\x12\x32\n\x04json\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.JsonFormatH\x00\x12\x32\n\x04\x61vro\x18\x02 \x01(\x0b\x32\".fennel.proto.connector.AvroFormatH\x00\x12:\n\x08protobuf\x18\x03 \x01(\x0b\x32&.fennel.proto.connector.ProtobufFormatH\x00\x42\t\n\x07variant\"\x10\n\x0e\x44\x65\x62\x65ziumFormat\"\x0c\n\nJsonFormat\"S\n\nAvroFormat\x12\x45\n\x0fschema_registry\x18\x01 \x01(\x0b\x32,.fennel.proto.schema_registry.SchemaRegistry\"W\n\x0eProtobufFormat\x12\x45\n\x0fschema_registry\x18\x01 \x01(\x0b\x32,.fennel.proto.schema_registry.SchemaRegistry\"\xde\x01\n\tReference\x12;\n\x06\x64\x62type\x18\x01 \x01(\x0e\x32+.fennel.proto.connector.Reference.ExtDBType\"\x93\x01\n\tExtDBType\x12\t\n\x05MYSQL\x10\x00\x12\x0c\n\x08POSTGRES\x10\x01\x12\x06\n\x02S3\x10\x02\x12\t\n\x05KAFKA\x10\x03\x12\x0c\n\x08\x42IGQUERY\x10\x04\x12\r\n\tSNOWFLAKE\x10\x05\x12\x0b\n\x07WEBHOOK\x10\x06\x12\x0b\n\x07KINESIS\x10\x07\x12\x0c\n\x08REDSHIFT\x10\x08\x12\t\n\x05MONGO\x10\t\x12\n\n\x06PUBSUB\x10\n\"E\n\x07Webhook\x12\x0c\n\x04name\x18\x01 \x01(\t\x12,\n\tretention\x18\x02 \x01(\x0b\x32\x19.google.protobuf.Duration\"\x8c\x02\n\x05MySQL\x12\x0e\n\x04user\x18\x03 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x07 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x04 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04port\x18\x05 \x01(\r\x12\x13\n\x0bjdbc_params\x18\x06 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\x8f\x02\n\x08Postgres\x12\x0e\n\x04user\x18\x03 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x07 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x04 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04port\x18\x05 \x01(\r\x12\x13\n\x0bjdbc_params\x18\x06 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\xb0\x02\n\x02S3\x12\x1f\n\x15\x61ws_secret_access_key\x18\x01 \x01(\tH\x00\x12\x46\n\x1c\x61ws_secret_access_key_secret\x18\x04 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x1b\n\x11\x61ws_access_key_id\x18\x02 \x01(\tH\x01\x12\x42\n\x18\x61ws_access_key_id_secret\x18\x05 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x15\n\x08role_arn\x18\x03 \x01(\tH\x02\x88\x01\x01\x42\x1f\n\x1d\x61ws_secret_access_key_variantB\x1b\n\x19\x61ws_access_key_id_variantB\x0b\n\t_role_arn\"\xb6\x01\n\x08\x42igquery\x12\x1d\n\x13service_account_key\x18\x02 \x01(\tH\x00\x12\x44\n\x1aservice_account_key_secret\x18\x04 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\ndataset_id\x18\x01 \x01(\t\x12\x12\n\nproject_id\x18\x03 \x01(\tB\x1d\n\x1bservice_account_key_variant\"\xa1\x02\n\tSnowflake\x12\x0e\n\x04user\x18\x02 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x03 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\t \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0f\n\x07\x61\x63\x63ount\x18\x01 \x01(\t\x12\x0e\n\x06schema\x18\x04 \x01(\t\x12\x11\n\twarehouse\x18\x05 \x01(\t\x12\x0c\n\x04role\x18\x06 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x07 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\xf9\x02\n\x05Kafka\x12\x1d\n\x13sasl_plain_username\x18\x05 \x01(\tH\x00\x12>\n\x14sasl_username_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x1d\n\x13sasl_plain_password\x18\x06 \x01(\tH\x01\x12>\n\x14sasl_password_secret\x18\t \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x19\n\x11\x62ootstrap_servers\x18\x01 \x01(\t\x12\x19\n\x11security_protocol\x18\x02 \x01(\t\x12\x16\n\x0esasl_mechanism\x18\x03 \x01(\t\x12\x1c\n\x10sasl_jaas_config\x18\x04 \x01(\tB\x02\x18\x01\x12\x14\n\x08group_id\x18\x07 \x01(\tB\x02\x18\x01\x42\x17\n\x15sasl_username_variantB\x17\n\x15sasl_password_variant\"\x1b\n\x07Kinesis\x12\x10\n\x08role_arn\x18\x01 \x01(\t\"\xd3\x01\n\x0b\x43redentials\x12\x12\n\x08username\x18\x01 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x03 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x02 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x04 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x42\x12\n\x10username_variantB\x12\n\x10password_variant\"}\n\x16RedshiftAuthentication\x12\x1c\n\x12s3_access_role_arn\x18\x01 \x01(\tH\x00\x12:\n\x0b\x63redentials\x18\x02 \x01(\x0b\x32#.fennel.proto.connector.CredentialsH\x00\x42\t\n\x07variant\"\x99\x01\n\x08Redshift\x12\x10\n\x08\x64\x61tabase\x18\x01 \x01(\t\x12\x0c\n\x04host\x18\x02 \x01(\t\x12\x0c\n\x04port\x18\x03 \x01(\r\x12\x0e\n\x06schema\x18\x04 \x01(\t\x12O\n\x17redshift_authentication\x18\x05 \x01(\x0b\x32..fennel.proto.connector.RedshiftAuthentication\"\xe9\x01\n\x05Mongo\x12\x0e\n\x04user\x18\x03 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x05 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x04 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x06 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\xa0\x01\n\x06PubSub\x12\x1d\n\x13service_account_key\x18\x02 \x01(\tH\x00\x12\x44\n\x1aservice_account_key_secret\x18\x03 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\nproject_id\x18\x01 \x01(\tB\x1d\n\x1bservice_account_key_variant\"s\n\x0eSensitiveDatum\x12\x10\n\x06secret\x18\x01 \x01(\tH\x00\x12\x34\n\nsecret_ref\x18\x02 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x42\x19\n\x17sensitive_datum_variant\"\xb8\x01\n\x04Http\x12\x0e\n\x04host\x18\x01 \x01(\tH\x00\x12\x35\n\x0bhost_secret\x18\x02 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x0f\n\x07healthz\x18\x03 \x01(\t\x12<\n\x07\x63\x61_cert\x18\x04 \x01(\x0b\x32&.fennel.proto.connector.SensitiveDatumH\x01\x88\x01\x01\x42\x0e\n\x0chost_variantB\n\n\x08_ca_cert\"\xf7\x05\n\x08\x45xtTable\x12\x39\n\x0bmysql_table\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.MySQLTableH\x00\x12\x39\n\x08pg_table\x18\x02 \x01(\x0b\x32%.fennel.proto.connector.PostgresTableH\x00\x12\x33\n\x08s3_table\x18\x03 \x01(\x0b\x32\x1f.fennel.proto.connector.S3TableH\x00\x12\x39\n\x0bkafka_topic\x18\x04 \x01(\x0b\x32\".fennel.proto.connector.KafkaTopicH\x00\x12\x41\n\x0fsnowflake_table\x18\x05 \x01(\x0b\x32&.fennel.proto.connector.SnowflakeTableH\x00\x12?\n\x0e\x62igquery_table\x18\x06 \x01(\x0b\x32%.fennel.proto.connector.BigqueryTableH\x00\x12;\n\x08\x65ndpoint\x18\x07 \x01(\x0b\x32\'.fennel.proto.connector.WebhookEndpointH\x00\x12?\n\x0ekinesis_stream\x18\x08 \x01(\x0b\x32%.fennel.proto.connector.KinesisStreamH\x00\x12?\n\x0eredshift_table\x18\t \x01(\x0b\x32%.fennel.proto.connector.RedshiftTableH\x00\x12\x43\n\x10mongo_collection\x18\n \x01(\x0b\x32\'.fennel.proto.connector.MongoCollectionH\x00\x12;\n\x0cpubsub_topic\x18\x0b \x01(\x0b\x32#.fennel.proto.connector.PubSubTopicH\x00\x12\x35\n\thttp_path\x18\x0c \x01(\x0b\x32 .fennel.proto.connector.HttpPathH\x00\x42\t\n\x07variant\"Q\n\nMySQLTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"z\n\rPostgresTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\x12\x16\n\tslot_name\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x0c\n\n_slot_name\"\xf7\x01\n\x07S3Table\x12\x0e\n\x06\x62ucket\x18\x01 \x01(\t\x12\x13\n\x0bpath_prefix\x18\x02 \x01(\t\x12\x11\n\tdelimiter\x18\x04 \x01(\t\x12\x0e\n\x06\x66ormat\x18\x05 \x01(\t\x12/\n\x02\x64\x62\x18\x06 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\npre_sorted\x18\x07 \x01(\x08\x12\x13\n\x0bpath_suffix\x18\x08 \x01(\t\x12.\n\x06spread\x18\x03 \x01(\x0b\x32\x19.google.protobuf.DurationH\x00\x88\x01\x01\x12\x0f\n\x07headers\x18\t \x03(\tB\t\n\x07_spread\"\x81\x01\n\nKafkaTopic\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\r\n\x05topic\x18\x02 \x01(\t\x12\x33\n\x06\x66ormat\x18\x03 \x01(\x0b\x32#.fennel.proto.connector.KafkaFormat\"T\n\rBigqueryTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"U\n\x0eSnowflakeTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"\x81\x01\n\x0fWebhookEndpoint\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08\x65ndpoint\x18\x02 \x01(\t\x12+\n\x08\x64uration\x18\x03 \x01(\x0b\x32\x19.google.protobuf.Duration\"\xd3\x01\n\rKinesisStream\x12\x12\n\nstream_arn\x18\x01 \x01(\t\x12\x39\n\rinit_position\x18\x02 \x01(\x0e\x32\".fennel.proto.kinesis.InitPosition\x12\x32\n\x0einit_timestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0e\n\x06\x66ormat\x18\x04 \x01(\t\x12/\n\x02\x64\x62\x18\x05 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\"T\n\rRedshiftTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"\x86\x01\n\x0bPubSubTopic\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08topic_id\x18\x02 \x01(\t\x12\x34\n\x06\x66ormat\x18\x03 \x01(\x0b\x32$.fennel.proto.connector.PubSubFormat\"\xdb\x01\n\x08HttpPath\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08\x65ndpoint\x18\x02 \x01(\t\x12\x12\n\x05limit\x18\x03 \x01(\rH\x00\x88\x01\x01\x12>\n\x07headers\x18\x04 \x03(\x0b\x32-.fennel.proto.connector.HttpPath.HeadersEntry\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x08\n\x06_limit\"\x9e\x01\n\x04\x45val\x12+\n\x06schema\x18\x01 \x01(\x0b\x32\x1b.fennel.proto.schema.Schema\x12-\n\x04\x65xpr\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.expression.ExprH\x00\x12-\n\x06pycode\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCodeH\x00\x42\x0b\n\teval_type\"\x83\x01\n\x0cPreProcValue\x12\r\n\x03ref\x18\x01 \x01(\tH\x00\x12+\n\x05value\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.schema.ValueH\x00\x12,\n\x04\x65val\x18\x03 \x01(\x0b\x32\x1c.fennel.proto.connector.EvalH\x00\x42\t\n\x07variant\"[\n\x0fMongoCollection\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x17\n\x0f\x63ollection_name\x18\x02 \x01(\t\"2\n\x0cSnapshotData\x12\x0e\n\x06marker\x18\x01 \x01(\t\x12\x12\n\nnum_retain\x18\x02 \x01(\r\"\r\n\x0bIncremental\"\n\n\x08Recreate\"\xbc\x01\n\x05Style\x12:\n\x0bincremental\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.IncrementalH\x00\x12\x34\n\x08recreate\x18\x02 \x01(\x0b\x32 .fennel.proto.connector.RecreateH\x00\x12\x38\n\x08snapshot\x18\x03 \x01(\x0b\x32$.fennel.proto.connector.SnapshotDataH\x00\x42\x07\n\x05Style\"\xa5\x07\n\x06Source\x12/\n\x05table\x18\x01 \x01(\x0b\x32 .fennel.proto.connector.ExtTable\x12\x0f\n\x07\x64\x61taset\x18\x02 \x01(\t\x12\x12\n\nds_version\x18\x03 \x01(\r\x12(\n\x05\x65very\x18\x04 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x13\n\x06\x63ursor\x18\x05 \x01(\tH\x00\x88\x01\x01\x12+\n\x08\x64isorder\x18\x06 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x17\n\x0ftimestamp_field\x18\x07 \x01(\t\x12\x30\n\x03\x63\x64\x63\x18\x08 \x01(\x0e\x32#.fennel.proto.connector.CDCStrategy\x12\x31\n\rstarting_from\x18\t \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12=\n\x08pre_proc\x18\n \x03(\x0b\x32+.fennel.proto.connector.Source.PreProcEntry\x12\x0f\n\x07version\x18\x0b \x01(\r\x12\x0f\n\x07\x62ounded\x18\x0c \x01(\x08\x12\x30\n\x08idleness\x18\r \x01(\x0b\x32\x19.google.protobuf.DurationH\x01\x88\x01\x01\x12)\n\x05until\x18\x0e \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x30\n\x06\x66ilter\x18\x0f \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCodeH\x02\x88\x01\x01\x12H\n\x11sampling_strategy\x18\x10 \x01(\x0b\x32(.fennel.proto.connector.SamplingStrategyH\x03\x88\x01\x01\x12\x37\n\x0b\x66ilter_expr\x18\x11 \x01(\x0b\x32\x1d.fennel.proto.expression.ExprH\x04\x88\x01\x01\x12\x37\n\rfilter_schema\x18\x12 \x01(\x0b\x32\x1b.fennel.proto.schema.SchemaH\x05\x88\x01\x01\x1aT\n\x0cPreProcEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x33\n\x05value\x18\x02 \x01(\x0b\x32$.fennel.proto.connector.PreProcValue:\x02\x38\x01\x42\t\n\x07_cursorB\x0b\n\t_idlenessB\t\n\x07_filterB\x14\n\x12_sampling_strategyB\x0e\n\x0c_filter_exprB\x10\n\x0e_filter_schema\"\xee\x03\n\x04Sink\x12/\n\x05table\x18\x01 \x01(\x0b\x32 .fennel.proto.connector.ExtTable\x12\x0f\n\x07\x64\x61taset\x18\x02 \x01(\t\x12\x12\n\nds_version\x18\x03 \x01(\r\x12\x35\n\x03\x63\x64\x63\x18\x04 \x01(\x0e\x32#.fennel.proto.connector.CDCStrategyH\x00\x88\x01\x01\x12(\n\x05\x65very\x18\x05 \x01(\x0b\x32\x19.google.protobuf.Duration\x12/\n\x03how\x18\x06 \x01(\x0b\x32\x1d.fennel.proto.connector.StyleH\x01\x88\x01\x01\x12\x0e\n\x06\x63reate\x18\x07 \x01(\x08\x12:\n\x07renames\x18\x08 \x03(\x0b\x32).fennel.proto.connector.Sink.RenamesEntry\x12.\n\x05since\x18\t \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x02\x88\x01\x01\x12.\n\x05until\x18\n \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x03\x88\x01\x01\x1a.\n\x0cRenamesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x06\n\x04_cdcB\x06\n\x04_howB\x08\n\x06_sinceB\x08\n\x06_until*K\n\x0b\x43\x44\x43Strategy\x12\n\n\x06\x41ppend\x10\x00\x12\n\n\x06Upsert\x10\x01\x12\x0c\n\x08\x44\x65\x62\x65zium\x10\x02\x12\n\n\x06Native\x10\x03\x12\n\n\x06\x44\x65lete\x10\x04\x62\x06proto3')
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0f\x63onnector.proto\x12\x16\x66\x65nnel.proto.connector\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x10\x65xpression.proto\x1a\rkinesis.proto\x1a\x0cpycode.proto\x1a\x15schema_registry.proto\x1a\x0cschema.proto\x1a\x0csecret.proto\"\xba\x05\n\x0b\x45xtDatabase\x12\x0c\n\x04name\x18\x01 \x01(\t\x12.\n\x05mysql\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.connector.MySQLH\x00\x12\x34\n\x08postgres\x18\x03 \x01(\x0b\x32 .fennel.proto.connector.PostgresH\x00\x12\x36\n\treference\x18\x04 \x01(\x0b\x32!.fennel.proto.connector.ReferenceH\x00\x12(\n\x02s3\x18\x05 \x01(\x0b\x32\x1a.fennel.proto.connector.S3H\x00\x12\x34\n\x08\x62igquery\x18\x06 \x01(\x0b\x32 .fennel.proto.connector.BigqueryH\x00\x12\x36\n\tsnowflake\x18\x07 \x01(\x0b\x32!.fennel.proto.connector.SnowflakeH\x00\x12.\n\x05kafka\x18\x08 \x01(\x0b\x32\x1d.fennel.proto.connector.KafkaH\x00\x12\x32\n\x07webhook\x18\t \x01(\x0b\x32\x1f.fennel.proto.connector.WebhookH\x00\x12\x32\n\x07kinesis\x18\n \x01(\x0b\x32\x1f.fennel.proto.connector.KinesisH\x00\x12\x34\n\x08redshift\x18\x0b \x01(\x0b\x32 .fennel.proto.connector.RedshiftH\x00\x12.\n\x05mongo\x18\x0c \x01(\x0b\x32\x1d.fennel.proto.connector.MongoH\x00\x12\x30\n\x06pubsub\x18\r \x01(\x0b\x32\x1e.fennel.proto.connector.PubSubH\x00\x12,\n\x04http\x18\x0e \x01(\x0b\x32\x1c.fennel.proto.connector.HttpH\x00\x42\t\n\x07variant\"?\n\x10SamplingStrategy\x12\x15\n\rsampling_rate\x18\x01 \x01(\x01\x12\x14\n\x0c\x63olumns_used\x18\x02 \x03(\t\"M\n\x0cPubSubFormat\x12\x32\n\x04json\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.JsonFormatH\x00\x42\t\n\x07variant\"\xbc\x01\n\x0bKafkaFormat\x12\x32\n\x04json\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.JsonFormatH\x00\x12\x32\n\x04\x61vro\x18\x02 \x01(\x0b\x32\".fennel.proto.connector.AvroFormatH\x00\x12:\n\x08protobuf\x18\x03 \x01(\x0b\x32&.fennel.proto.connector.ProtobufFormatH\x00\x42\t\n\x07variant\"\x10\n\x0e\x44\x65\x62\x65ziumFormat\"\x0c\n\nJsonFormat\"S\n\nAvroFormat\x12\x45\n\x0fschema_registry\x18\x01 \x01(\x0b\x32,.fennel.proto.schema_registry.SchemaRegistry\"W\n\x0eProtobufFormat\x12\x45\n\x0fschema_registry\x18\x01 \x01(\x0b\x32,.fennel.proto.schema_registry.SchemaRegistry\"\xde\x01\n\tReference\x12;\n\x06\x64\x62type\x18\x01 \x01(\x0e\x32+.fennel.proto.connector.Reference.ExtDBType\"\x93\x01\n\tExtDBType\x12\t\n\x05MYSQL\x10\x00\x12\x0c\n\x08POSTGRES\x10\x01\x12\x06\n\x02S3\x10\x02\x12\t\n\x05KAFKA\x10\x03\x12\x0c\n\x08\x42IGQUERY\x10\x04\x12\r\n\tSNOWFLAKE\x10\x05\x12\x0b\n\x07WEBHOOK\x10\x06\x12\x0b\n\x07KINESIS\x10\x07\x12\x0c\n\x08REDSHIFT\x10\x08\x12\t\n\x05MONGO\x10\t\x12\n\n\x06PUBSUB\x10\n\"E\n\x07Webhook\x12\x0c\n\x04name\x18\x01 \x01(\t\x12,\n\tretention\x18\x02 \x01(\x0b\x32\x19.google.protobuf.Duration\"\x8c\x02\n\x05MySQL\x12\x0e\n\x04user\x18\x03 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x07 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x04 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04port\x18\x05 \x01(\r\x12\x13\n\x0bjdbc_params\x18\x06 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\x8f\x02\n\x08Postgres\x12\x0e\n\x04user\x18\x03 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x07 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x04 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04port\x18\x05 \x01(\r\x12\x13\n\x0bjdbc_params\x18\x06 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\xb0\x02\n\x02S3\x12\x1f\n\x15\x61ws_secret_access_key\x18\x01 \x01(\tH\x00\x12\x46\n\x1c\x61ws_secret_access_key_secret\x18\x04 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x1b\n\x11\x61ws_access_key_id\x18\x02 \x01(\tH\x01\x12\x42\n\x18\x61ws_access_key_id_secret\x18\x05 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x15\n\x08role_arn\x18\x03 \x01(\tH\x02\x88\x01\x01\x42\x1f\n\x1d\x61ws_secret_access_key_variantB\x1b\n\x19\x61ws_access_key_id_variantB\x0b\n\t_role_arn\"\xb6\x01\n\x08\x42igquery\x12\x1d\n\x13service_account_key\x18\x02 \x01(\tH\x00\x12\x44\n\x1aservice_account_key_secret\x18\x04 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\ndataset_id\x18\x01 \x01(\t\x12\x12\n\nproject_id\x18\x03 \x01(\tB\x1d\n\x1bservice_account_key_variant\"\xa1\x02\n\tSnowflake\x12\x0e\n\x04user\x18\x02 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x03 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\t \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0f\n\x07\x61\x63\x63ount\x18\x01 \x01(\t\x12\x0e\n\x06schema\x18\x04 \x01(\t\x12\x11\n\twarehouse\x18\x05 \x01(\t\x12\x0c\n\x04role\x18\x06 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x07 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\xf9\x02\n\x05Kafka\x12\x1d\n\x13sasl_plain_username\x18\x05 \x01(\tH\x00\x12>\n\x14sasl_username_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x1d\n\x13sasl_plain_password\x18\x06 \x01(\tH\x01\x12>\n\x14sasl_password_secret\x18\t \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x19\n\x11\x62ootstrap_servers\x18\x01 \x01(\t\x12\x19\n\x11security_protocol\x18\x02 \x01(\t\x12\x16\n\x0esasl_mechanism\x18\x03 \x01(\t\x12\x1c\n\x10sasl_jaas_config\x18\x04 \x01(\tB\x02\x18\x01\x12\x14\n\x08group_id\x18\x07 \x01(\tB\x02\x18\x01\x42\x17\n\x15sasl_username_variantB\x17\n\x15sasl_password_variant\"\x1b\n\x07Kinesis\x12\x10\n\x08role_arn\x18\x01 \x01(\t\"\xd3\x01\n\x0b\x43redentials\x12\x12\n\x08username\x18\x01 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x03 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x02 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x04 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x42\x12\n\x10username_variantB\x12\n\x10password_variant\"}\n\x16RedshiftAuthentication\x12\x1c\n\x12s3_access_role_arn\x18\x01 \x01(\tH\x00\x12:\n\x0b\x63redentials\x18\x02 \x01(\x0b\x32#.fennel.proto.connector.CredentialsH\x00\x42\t\n\x07variant\"\x99\x01\n\x08Redshift\x12\x10\n\x08\x64\x61tabase\x18\x01 \x01(\t\x12\x0c\n\x04host\x18\x02 \x01(\t\x12\x0c\n\x04port\x18\x03 \x01(\r\x12\x0e\n\x06schema\x18\x04 \x01(\t\x12O\n\x17redshift_authentication\x18\x05 \x01(\x0b\x32..fennel.proto.connector.RedshiftAuthentication\"\xe9\x01\n\x05Mongo\x12\x0e\n\x04user\x18\x03 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x05 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x04 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x06 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\xa0\x01\n\x06PubSub\x12\x1d\n\x13service_account_key\x18\x02 \x01(\tH\x00\x12\x44\n\x1aservice_account_key_secret\x18\x03 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\nproject_id\x18\x01 \x01(\tB\x1d\n\x1bservice_account_key_variant\"s\n\x0eSensitiveDatum\x12\x10\n\x06secret\x18\x01 \x01(\tH\x00\x12\x34\n\nsecret_ref\x18\x02 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x42\x19\n\x17sensitive_datum_variant\"\xb8\x01\n\x04Http\x12\x0e\n\x04host\x18\x01 \x01(\tH\x00\x12\x35\n\x0bhost_secret\x18\x02 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x0f\n\x07healthz\x18\x03 \x01(\t\x12<\n\x07\x63\x61_cert\x18\x04 \x01(\x0b\x32&.fennel.proto.connector.SensitiveDatumH\x01\x88\x01\x01\x42\x0e\n\x0chost_variantB\n\n\x08_ca_cert\"\xf7\x05\n\x08\x45xtTable\x12\x39\n\x0bmysql_table\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.MySQLTableH\x00\x12\x39\n\x08pg_table\x18\x02 \x01(\x0b\x32%.fennel.proto.connector.PostgresTableH\x00\x12\x33\n\x08s3_table\x18\x03 \x01(\x0b\x32\x1f.fennel.proto.connector.S3TableH\x00\x12\x39\n\x0bkafka_topic\x18\x04 \x01(\x0b\x32\".fennel.proto.connector.KafkaTopicH\x00\x12\x41\n\x0fsnowflake_table\x18\x05 \x01(\x0b\x32&.fennel.proto.connector.SnowflakeTableH\x00\x12?\n\x0e\x62igquery_table\x18\x06 \x01(\x0b\x32%.fennel.proto.connector.BigqueryTableH\x00\x12;\n\x08\x65ndpoint\x18\x07 \x01(\x0b\x32\'.fennel.proto.connector.WebhookEndpointH\x00\x12?\n\x0ekinesis_stream\x18\x08 \x01(\x0b\x32%.fennel.proto.connector.KinesisStreamH\x00\x12?\n\x0eredshift_table\x18\t \x01(\x0b\x32%.fennel.proto.connector.RedshiftTableH\x00\x12\x43\n\x10mongo_collection\x18\n \x01(\x0b\x32\'.fennel.proto.connector.MongoCollectionH\x00\x12;\n\x0cpubsub_topic\x18\x0b \x01(\x0b\x32#.fennel.proto.connector.PubSubTopicH\x00\x12\x35\n\thttp_path\x18\x0c \x01(\x0b\x32 .fennel.proto.connector.HttpPathH\x00\x42\t\n\x07variant\"Q\n\nMySQLTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"z\n\rPostgresTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\x12\x16\n\tslot_name\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x0c\n\n_slot_name\"\xf7\x01\n\x07S3Table\x12\x0e\n\x06\x62ucket\x18\x01 \x01(\t\x12\x13\n\x0bpath_prefix\x18\x02 \x01(\t\x12\x11\n\tdelimiter\x18\x04 \x01(\t\x12\x0e\n\x06\x66ormat\x18\x05 \x01(\t\x12/\n\x02\x64\x62\x18\x06 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\npre_sorted\x18\x07 \x01(\x08\x12\x13\n\x0bpath_suffix\x18\x08 \x01(\t\x12.\n\x06spread\x18\x03 \x01(\x0b\x32\x19.google.protobuf.DurationH\x00\x88\x01\x01\x12\x0f\n\x07headers\x18\t \x03(\tB\t\n\x07_spread\"\x81\x01\n\nKafkaTopic\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\r\n\x05topic\x18\x02 \x01(\t\x12\x33\n\x06\x66ormat\x18\x03 \x01(\x0b\x32#.fennel.proto.connector.KafkaFormat\"T\n\rBigqueryTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"U\n\x0eSnowflakeTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"\x81\x01\n\x0fWebhookEndpoint\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08\x65ndpoint\x18\x02 \x01(\t\x12+\n\x08\x64uration\x18\x03 \x01(\x0b\x32\x19.google.protobuf.Duration\"\xd3\x01\n\rKinesisStream\x12\x12\n\nstream_arn\x18\x01 \x01(\t\x12\x39\n\rinit_position\x18\x02 \x01(\x0e\x32\".fennel.proto.kinesis.InitPosition\x12\x32\n\x0einit_timestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0e\n\x06\x66ormat\x18\x04 \x01(\t\x12/\n\x02\x64\x62\x18\x05 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\"T\n\rRedshiftTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"\x86\x01\n\x0bPubSubTopic\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08topic_id\x18\x02 \x01(\t\x12\x34\n\x06\x66ormat\x18\x03 \x01(\x0b\x32$.fennel.proto.connector.PubSubFormat\"\xdb\x01\n\x08HttpPath\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08\x65ndpoint\x18\x02 \x01(\t\x12\x12\n\x05limit\x18\x03 \x01(\rH\x00\x88\x01\x01\x12>\n\x07headers\x18\x04 \x03(\x0b\x32-.fennel.proto.connector.HttpPath.HeadersEntry\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x08\n\x06_limit\"\x9e\x01\n\x04\x45val\x12+\n\x06schema\x18\x01 \x01(\x0b\x32\x1b.fennel.proto.schema.Schema\x12-\n\x04\x65xpr\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.expression.ExprH\x00\x12-\n\x06pycode\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCodeH\x00\x42\x0b\n\teval_type\"\x83\x01\n\x0cPreProcValue\x12\r\n\x03ref\x18\x01 \x01(\tH\x00\x12+\n\x05value\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.schema.ValueH\x00\x12,\n\x04\x65val\x18\x03 \x01(\x0b\x32\x1c.fennel.proto.connector.EvalH\x00\x42\t\n\x07variant\"[\n\x0fMongoCollection\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x17\n\x0f\x63ollection_name\x18\x02 \x01(\t\"2\n\x0cSnapshotData\x12\x0e\n\x06marker\x18\x01 \x01(\t\x12\x12\n\nnum_retain\x18\x02 \x01(\r\"\r\n\x0bIncremental\"\n\n\x08Recreate\"\xbc\x01\n\x05Style\x12:\n\x0bincremental\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.IncrementalH\x00\x12\x34\n\x08recreate\x18\x02 \x01(\x0b\x32 .fennel.proto.connector.RecreateH\x00\x12\x38\n\x08snapshot\x18\x03 \x01(\x0b\x32$.fennel.proto.connector.SnapshotDataH\x00\x42\x07\n\x05Style\"\xa5\x07\n\x06Source\x12/\n\x05table\x18\x01 \x01(\x0b\x32 .fennel.proto.connector.ExtTable\x12\x0f\n\x07\x64\x61taset\x18\x02 \x01(\t\x12\x12\n\nds_version\x18\x03 \x01(\r\x12(\n\x05\x65very\x18\x04 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x13\n\x06\x63ursor\x18\x05 \x01(\tH\x00\x88\x01\x01\x12+\n\x08\x64isorder\x18\x06 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x17\n\x0ftimestamp_field\x18\x07 \x01(\t\x12\x30\n\x03\x63\x64\x63\x18\x08 \x01(\x0e\x32#.fennel.proto.connector.CDCStrategy\x12\x31\n\rstarting_from\x18\t \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12=\n\x08pre_proc\x18\n \x03(\x0b\x32+.fennel.proto.connector.Source.PreProcEntry\x12\x0f\n\x07version\x18\x0b \x01(\r\x12\x0f\n\x07\x62ounded\x18\x0c \x01(\x08\x12\x30\n\x08idleness\x18\r \x01(\x0b\x32\x19.google.protobuf.DurationH\x01\x88\x01\x01\x12)\n\x05until\x18\x0e \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x30\n\x06\x66ilter\x18\x0f \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCodeH\x02\x88\x01\x01\x12H\n\x11sampling_strategy\x18\x10 \x01(\x0b\x32(.fennel.proto.connector.SamplingStrategyH\x03\x88\x01\x01\x12\x37\n\x0b\x66ilter_expr\x18\x11 \x01(\x0b\x32\x1d.fennel.proto.expression.ExprH\x04\x88\x01\x01\x12\x37\n\rfilter_schema\x18\x12 \x01(\x0b\x32\x1b.fennel.proto.schema.SchemaH\x05\x88\x01\x01\x1aT\n\x0cPreProcEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x33\n\x05value\x18\x02 \x01(\x0b\x32$.fennel.proto.connector.PreProcValue:\x02\x38\x01\x42\t\n\x07_cursorB\x0b\n\t_idlenessB\t\n\x07_filterB\x14\n\x12_sampling_strategyB\x0e\n\x0c_filter_exprB\x10\n\x0e_filter_schema\"\x90\x04\n\x04Sink\x12/\n\x05table\x18\x01 \x01(\x0b\x32 .fennel.proto.connector.ExtTable\x12\x0f\n\x07\x64\x61taset\x18\x02 \x01(\t\x12\x12\n\nds_version\x18\x03 \x01(\r\x12\x35\n\x03\x63\x64\x63\x18\x04 \x01(\x0e\x32#.fennel.proto.connector.CDCStrategyH\x00\x88\x01\x01\x12(\n\x05\x65very\x18\x05 \x01(\x0b\x32\x19.google.protobuf.Duration\x12/\n\x03how\x18\x06 \x01(\x0b\x32\x1d.fennel.proto.connector.StyleH\x01\x88\x01\x01\x12\x0e\n\x06\x63reate\x18\x07 \x01(\x08\x12:\n\x07renames\x18\x08 \x03(\x0b\x32).fennel.proto.connector.Sink.RenamesEntry\x12.\n\x05since\x18\t \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x02\x88\x01\x01\x12.\n\x05until\x18\n \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x03\x88\x01\x01\x12\x14\n\x07stacked\x18\x0b \x01(\x08H\x04\x88\x01\x01\x1a.\n\x0cRenamesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x06\n\x04_cdcB\x06\n\x04_howB\x08\n\x06_sinceB\x08\n\x06_untilB\n\n\x08_stacked*K\n\x0b\x43\x44\x43Strategy\x12\n\n\x06\x41ppend\x10\x00\x12\n\n\x06Upsert\x10\x01\x12\x0c\n\x08\x44\x65\x62\x65zium\x10\x02\x12\n\n\x06Native\x10\x03\x12\n\n\x06\x44\x65lete\x10\x04\x62\x06proto3')
 
 _globals = globals()
 _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@@ -39,8 +39,8 @@
   _globals['_SOURCE_PREPROCENTRY']._serialized_options = b'8\001'
   _globals['_SINK_RENAMESENTRY']._loaded_options = None
   _globals['_SINK_RENAMESENTRY']._serialized_options = b'8\001'
-  _globals['_CDCSTRATEGY']._serialized_start=9088
-  _globals['_CDCSTRATEGY']._serialized_end=9163
+  _globals['_CDCSTRATEGY']._serialized_start=9122
+  _globals['_CDCSTRATEGY']._serialized_end=9197
   _globals['_EXTDATABASE']._serialized_start=207
   _globals['_EXTDATABASE']._serialized_end=905
   _globals['_SAMPLINGSTRATEGY']._serialized_start=907
@@ -136,7 +136,7 @@
   _globals['_SOURCE_PREPROCENTRY']._serialized_start=8414
   _globals['_SOURCE_PREPROCENTRY']._serialized_end=8498
   _globals['_SINK']._serialized_start=8592
-  _globals['_SINK']._serialized_end=9086
-  _globals['_SINK_RENAMESENTRY']._serialized_start=9004
-  _globals['_SINK_RENAMESENTRY']._serialized_end=9050
+  _globals['_SINK']._serialized_end=9120
+  _globals['_SINK_RENAMESENTRY']._serialized_start=9026
+  _globals['_SINK_RENAMESENTRY']._serialized_end=9072
 # @@protoc_insertion_point(module_scope)
diff --git a/fennel/gen/connector_pb2.pyi b/fennel/gen/connector_pb2.pyi
index afb8fe074..763b39aa6 100644
--- a/fennel/gen/connector_pb2.pyi
+++ b/fennel/gen/connector_pb2.pyi
@@ -1420,6 +1420,7 @@ class Sink(google.protobuf.message.Message):
     RENAMES_FIELD_NUMBER: builtins.int
     SINCE_FIELD_NUMBER: builtins.int
     UNTIL_FIELD_NUMBER: builtins.int
+    STACKED_FIELD_NUMBER: builtins.int
     @property
     def table(self) -> global___ExtTable: ...
     dataset: builtins.str
@@ -1436,6 +1437,7 @@ class Sink(google.protobuf.message.Message):
     def since(self) -> google.protobuf.timestamp_pb2.Timestamp: ...
     @property
     def until(self) -> google.protobuf.timestamp_pb2.Timestamp: ...
+    stacked: builtins.bool
     def __init__(
         self,
         *,
@@ -1449,9 +1451,10 @@ class Sink(google.protobuf.message.Message):
         renames: collections.abc.Mapping[builtins.str, builtins.str] | None = ...,
         since: google.protobuf.timestamp_pb2.Timestamp | None = ...,
         until: google.protobuf.timestamp_pb2.Timestamp | None = ...,
+        stacked: builtins.bool | None = ...,
     ) -> None: ...
-    def HasField(self, field_name: typing_extensions.Literal["_cdc", b"_cdc", "_how", b"_how", "_since", b"_since", "_until", b"_until", "cdc", b"cdc", "every", b"every", "how", b"how", "since", b"since", "table", b"table", "until", b"until"]) -> builtins.bool: ...
-    def ClearField(self, field_name: typing_extensions.Literal["_cdc", b"_cdc", "_how", b"_how", "_since", b"_since", "_until", b"_until", "cdc", b"cdc", "create", b"create", "dataset", b"dataset", "ds_version", b"ds_version", "every", b"every", "how", b"how", "renames", b"renames", "since", b"since", "table", b"table", "until", b"until"]) -> None: ...
+    def HasField(self, field_name: typing_extensions.Literal["_cdc", b"_cdc", "_how", b"_how", "_since", b"_since", "_stacked", b"_stacked", "_until", b"_until", "cdc", b"cdc", "every", b"every", "how", b"how", "since", b"since", "stacked", b"stacked", "table", b"table", "until", b"until"]) -> builtins.bool: ...
+    def ClearField(self, field_name: typing_extensions.Literal["_cdc", b"_cdc", "_how", b"_how", "_since", b"_since", "_stacked", b"_stacked", "_until", b"_until", "cdc", b"cdc", "create", b"create", "dataset", b"dataset", "ds_version", b"ds_version", "every", b"every", "how", b"how", "renames", b"renames", "since", b"since", "stacked", b"stacked", "table", b"table", "until", b"until"]) -> None: ...
     @typing.overload
     def WhichOneof(self, oneof_group: typing_extensions.Literal["_cdc", b"_cdc"]) -> typing_extensions.Literal["cdc"] | None: ...
     @typing.overload
@@ -1459,6 +1462,8 @@ class Sink(google.protobuf.message.Message):
     @typing.overload
     def WhichOneof(self, oneof_group: typing_extensions.Literal["_since", b"_since"]) -> typing_extensions.Literal["since"] | None: ...
     @typing.overload
+    def WhichOneof(self, oneof_group: typing_extensions.Literal["_stacked", b"_stacked"]) -> typing_extensions.Literal["stacked"] | None: ...
+    @typing.overload
     def WhichOneof(self, oneof_group: typing_extensions.Literal["_until", b"_until"]) -> typing_extensions.Literal["until"] | None: ...
 
 global___Sink = Sink
diff --git a/fennel/gen/spec_pb2.py b/fennel/gen/spec_pb2.py
index c0ca8043c..e61d7c3ac 100644
--- a/fennel/gen/spec_pb2.py
+++ b/fennel/gen/spec_pb2.py
@@ -15,7 +15,7 @@
 import fennel.gen.window_pb2 as window__pb2
 
 
-DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\nspec.proto\x12\x11\x66\x65nnel.proto.spec\x1a\x0cwindow.proto\"\x8f\x04\n\x07PreSpec\x12%\n\x03sum\x18\x01 \x01(\x0b\x32\x16.fennel.proto.spec.SumH\x00\x12-\n\x07\x61verage\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.spec.AverageH\x00\x12)\n\x05\x63ount\x18\x03 \x01(\x0b\x32\x18.fennel.proto.spec.CountH\x00\x12*\n\x06last_k\x18\x04 \x01(\x0b\x32\x18.fennel.proto.spec.LastKH\x00\x12%\n\x03min\x18\x05 \x01(\x0b\x32\x16.fennel.proto.spec.MinH\x00\x12%\n\x03max\x18\x06 \x01(\x0b\x32\x16.fennel.proto.spec.MaxH\x00\x12+\n\x06stddev\x18\x07 \x01(\x0b\x32\x19.fennel.proto.spec.StddevH\x00\x12/\n\x08\x64istinct\x18\x08 \x01(\x0b\x32\x1b.fennel.proto.spec.DistinctH\x00\x12/\n\x08quantile\x18\t \x01(\x0b\x32\x1b.fennel.proto.spec.QuantileH\x00\x12\x41\n\texp_decay\x18\n \x01(\x0b\x32,.fennel.proto.spec.ExponentialDecayAggregateH\x00\x12,\n\x07\x66irst_k\x18\x0b \x01(\x0b\x32\x19.fennel.proto.spec.FirstKH\x00\x42\t\n\x07variant\"L\n\x03Sum\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\"a\n\x07\x41verage\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"\x80\x01\n\x05\x43ount\x12\x0c\n\x04name\x18\x01 \x01(\t\x12+\n\x06window\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0e\n\x06unique\x18\x03 \x01(\x08\x12\x0e\n\x06\x61pprox\x18\x04 \x01(\x08\x12\n\n\x02of\x18\x05 \x01(\t\x12\x10\n\x08\x64ropnull\x18\x06 \x01(\x08\"~\n\x05LastK\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05limit\x18\x03 \x01(\r\x12\r\n\x05\x64\x65\x64up\x18\x04 \x01(\x08\x12+\n\x06window\x18\x05 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x10\n\x08\x64ropnull\x18\x06 \x01(\x08\"\x7f\n\x06\x46irstK\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05limit\x18\x03 \x01(\r\x12\r\n\x05\x64\x65\x64up\x18\x04 \x01(\x08\x12+\n\x06window\x18\x05 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x10\n\x08\x64ropnull\x18\x06 \x01(\x08\"]\n\x03Min\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"]\n\x03Max\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"`\n\x06Stddev\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"c\n\x08\x44istinct\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x10\n\x08\x64ropnull\x18\x04 \x01(\x08\"\x95\x01\n\x08Quantile\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x14\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01H\x00\x88\x01\x01\x12\x10\n\x08quantile\x18\x05 \x01(\x01\x12\x0e\n\x06\x61pprox\x18\x06 \x01(\x08\x42\n\n\x08_default\"}\n\x19\x45xponentialDecayAggregate\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x19\n\x11half_life_seconds\x18\x04 \x01(\rb\x06proto3')
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\nspec.proto\x12\x11\x66\x65nnel.proto.spec\x1a\x0cwindow.proto\"\x8f\x04\n\x07PreSpec\x12%\n\x03sum\x18\x01 \x01(\x0b\x32\x16.fennel.proto.spec.SumH\x00\x12-\n\x07\x61verage\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.spec.AverageH\x00\x12)\n\x05\x63ount\x18\x03 \x01(\x0b\x32\x18.fennel.proto.spec.CountH\x00\x12*\n\x06last_k\x18\x04 \x01(\x0b\x32\x18.fennel.proto.spec.LastKH\x00\x12%\n\x03min\x18\x05 \x01(\x0b\x32\x16.fennel.proto.spec.MinH\x00\x12%\n\x03max\x18\x06 \x01(\x0b\x32\x16.fennel.proto.spec.MaxH\x00\x12+\n\x06stddev\x18\x07 \x01(\x0b\x32\x19.fennel.proto.spec.StddevH\x00\x12/\n\x08\x64istinct\x18\x08 \x01(\x0b\x32\x1b.fennel.proto.spec.DistinctH\x00\x12/\n\x08quantile\x18\t \x01(\x0b\x32\x1b.fennel.proto.spec.QuantileH\x00\x12\x41\n\texp_decay\x18\n \x01(\x0b\x32,.fennel.proto.spec.ExponentialDecayAggregateH\x00\x12,\n\x07\x66irst_k\x18\x0b \x01(\x0b\x32\x19.fennel.proto.spec.FirstKH\x00\x42\t\n\x07variant\"L\n\x03Sum\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\"w\n\x07\x41verage\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\x12\x14\n\x0c\x64\x65\x66\x61ult_null\x18\x05 \x01(\x08\"\x80\x01\n\x05\x43ount\x12\x0c\n\x04name\x18\x01 \x01(\t\x12+\n\x06window\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0e\n\x06unique\x18\x03 \x01(\x08\x12\x0e\n\x06\x61pprox\x18\x04 \x01(\x08\x12\n\n\x02of\x18\x05 \x01(\t\x12\x10\n\x08\x64ropnull\x18\x06 \x01(\x08\"~\n\x05LastK\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05limit\x18\x03 \x01(\r\x12\r\n\x05\x64\x65\x64up\x18\x04 \x01(\x08\x12+\n\x06window\x18\x05 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x10\n\x08\x64ropnull\x18\x06 \x01(\x08\"\x7f\n\x06\x46irstK\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05limit\x18\x03 \x01(\r\x12\r\n\x05\x64\x65\x64up\x18\x04 \x01(\x08\x12+\n\x06window\x18\x05 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x10\n\x08\x64ropnull\x18\x06 \x01(\x08\"s\n\x03Min\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\x12\x14\n\x0c\x64\x65\x66\x61ult_null\x18\x05 \x01(\x08\"s\n\x03Max\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\x12\x14\n\x0c\x64\x65\x66\x61ult_null\x18\x05 \x01(\x08\"v\n\x06Stddev\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\x12\x14\n\x0c\x64\x65\x66\x61ult_null\x18\x05 \x01(\x08\"c\n\x08\x44istinct\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x10\n\x08\x64ropnull\x18\x04 \x01(\x08\"\x95\x01\n\x08Quantile\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x14\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01H\x00\x88\x01\x01\x12\x10\n\x08quantile\x18\x05 \x01(\x01\x12\x0e\n\x06\x61pprox\x18\x06 \x01(\x08\x42\n\n\x08_default\"}\n\x19\x45xponentialDecayAggregate\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x19\n\x11half_life_seconds\x18\x04 \x01(\rb\x06proto3')
 
 _globals = globals()
 _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@@ -27,23 +27,23 @@
   _globals['_SUM']._serialized_start=577
   _globals['_SUM']._serialized_end=653
   _globals['_AVERAGE']._serialized_start=655
-  _globals['_AVERAGE']._serialized_end=752
-  _globals['_COUNT']._serialized_start=755
-  _globals['_COUNT']._serialized_end=883
-  _globals['_LASTK']._serialized_start=885
-  _globals['_LASTK']._serialized_end=1011
-  _globals['_FIRSTK']._serialized_start=1013
-  _globals['_FIRSTK']._serialized_end=1140
-  _globals['_MIN']._serialized_start=1142
-  _globals['_MIN']._serialized_end=1235
-  _globals['_MAX']._serialized_start=1237
-  _globals['_MAX']._serialized_end=1330
-  _globals['_STDDEV']._serialized_start=1332
-  _globals['_STDDEV']._serialized_end=1428
-  _globals['_DISTINCT']._serialized_start=1430
-  _globals['_DISTINCT']._serialized_end=1529
-  _globals['_QUANTILE']._serialized_start=1532
-  _globals['_QUANTILE']._serialized_end=1681
-  _globals['_EXPONENTIALDECAYAGGREGATE']._serialized_start=1683
-  _globals['_EXPONENTIALDECAYAGGREGATE']._serialized_end=1808
+  _globals['_AVERAGE']._serialized_end=774
+  _globals['_COUNT']._serialized_start=777
+  _globals['_COUNT']._serialized_end=905
+  _globals['_LASTK']._serialized_start=907
+  _globals['_LASTK']._serialized_end=1033
+  _globals['_FIRSTK']._serialized_start=1035
+  _globals['_FIRSTK']._serialized_end=1162
+  _globals['_MIN']._serialized_start=1164
+  _globals['_MIN']._serialized_end=1279
+  _globals['_MAX']._serialized_start=1281
+  _globals['_MAX']._serialized_end=1396
+  _globals['_STDDEV']._serialized_start=1398
+  _globals['_STDDEV']._serialized_end=1516
+  _globals['_DISTINCT']._serialized_start=1518
+  _globals['_DISTINCT']._serialized_end=1617
+  _globals['_QUANTILE']._serialized_start=1620
+  _globals['_QUANTILE']._serialized_end=1769
+  _globals['_EXPONENTIALDECAYAGGREGATE']._serialized_start=1771
+  _globals['_EXPONENTIALDECAYAGGREGATE']._serialized_end=1896
 # @@protoc_insertion_point(module_scope)
diff --git a/fennel/gen/spec_pb2.pyi b/fennel/gen/spec_pb2.pyi
index 99baeba0a..0a0cbde3e 100644
--- a/fennel/gen/spec_pb2.pyi
+++ b/fennel/gen/spec_pb2.pyi
@@ -104,11 +104,13 @@ class Average(google.protobuf.message.Message):
     NAME_FIELD_NUMBER: builtins.int
     WINDOW_FIELD_NUMBER: builtins.int
     DEFAULT_FIELD_NUMBER: builtins.int
+    DEFAULT_NULL_FIELD_NUMBER: builtins.int
     of: builtins.str
     name: builtins.str
     @property
     def window(self) -> window_pb2.Window: ...
     default: builtins.float
+    default_null: builtins.bool
     def __init__(
         self,
         *,
@@ -116,9 +118,10 @@ class Average(google.protobuf.message.Message):
         name: builtins.str = ...,
         window: window_pb2.Window | None = ...,
         default: builtins.float = ...,
+        default_null: builtins.bool = ...,
     ) -> None: ...
     def HasField(self, field_name: typing_extensions.Literal["window", b"window"]) -> builtins.bool: ...
-    def ClearField(self, field_name: typing_extensions.Literal["default", b"default", "name", b"name", "of", b"of", "window", b"window"]) -> None: ...
+    def ClearField(self, field_name: typing_extensions.Literal["default", b"default", "default_null", b"default_null", "name", b"name", "of", b"of", "window", b"window"]) -> None: ...
 
 global___Average = Average
 
@@ -226,11 +229,13 @@ class Min(google.protobuf.message.Message):
     NAME_FIELD_NUMBER: builtins.int
     WINDOW_FIELD_NUMBER: builtins.int
     DEFAULT_FIELD_NUMBER: builtins.int
+    DEFAULT_NULL_FIELD_NUMBER: builtins.int
     of: builtins.str
     name: builtins.str
     @property
     def window(self) -> window_pb2.Window: ...
     default: builtins.float
+    default_null: builtins.bool
     def __init__(
         self,
         *,
@@ -238,9 +243,10 @@ class Min(google.protobuf.message.Message):
         name: builtins.str = ...,
         window: window_pb2.Window | None = ...,
         default: builtins.float = ...,
+        default_null: builtins.bool = ...,
     ) -> None: ...
     def HasField(self, field_name: typing_extensions.Literal["window", b"window"]) -> builtins.bool: ...
-    def ClearField(self, field_name: typing_extensions.Literal["default", b"default", "name", b"name", "of", b"of", "window", b"window"]) -> None: ...
+    def ClearField(self, field_name: typing_extensions.Literal["default", b"default", "default_null", b"default_null", "name", b"name", "of", b"of", "window", b"window"]) -> None: ...
 
 global___Min = Min
 
@@ -252,11 +258,13 @@ class Max(google.protobuf.message.Message):
     NAME_FIELD_NUMBER: builtins.int
     WINDOW_FIELD_NUMBER: builtins.int
     DEFAULT_FIELD_NUMBER: builtins.int
+    DEFAULT_NULL_FIELD_NUMBER: builtins.int
     of: builtins.str
     name: builtins.str
     @property
     def window(self) -> window_pb2.Window: ...
     default: builtins.float
+    default_null: builtins.bool
     def __init__(
         self,
         *,
@@ -264,9 +272,10 @@ class Max(google.protobuf.message.Message):
         name: builtins.str = ...,
         window: window_pb2.Window | None = ...,
         default: builtins.float = ...,
+        default_null: builtins.bool = ...,
     ) -> None: ...
     def HasField(self, field_name: typing_extensions.Literal["window", b"window"]) -> builtins.bool: ...
-    def ClearField(self, field_name: typing_extensions.Literal["default", b"default", "name", b"name", "of", b"of", "window", b"window"]) -> None: ...
+    def ClearField(self, field_name: typing_extensions.Literal["default", b"default", "default_null", b"default_null", "name", b"name", "of", b"of", "window", b"window"]) -> None: ...
 
 global___Max = Max
 
@@ -278,11 +287,13 @@ class Stddev(google.protobuf.message.Message):
     NAME_FIELD_NUMBER: builtins.int
     WINDOW_FIELD_NUMBER: builtins.int
     DEFAULT_FIELD_NUMBER: builtins.int
+    DEFAULT_NULL_FIELD_NUMBER: builtins.int
     of: builtins.str
     name: builtins.str
     @property
     def window(self) -> window_pb2.Window: ...
     default: builtins.float
+    default_null: builtins.bool
     def __init__(
         self,
         *,
@@ -290,9 +301,10 @@ class Stddev(google.protobuf.message.Message):
         name: builtins.str = ...,
         window: window_pb2.Window | None = ...,
         default: builtins.float = ...,
+        default_null: builtins.bool = ...,
     ) -> None: ...
     def HasField(self, field_name: typing_extensions.Literal["window", b"window"]) -> builtins.bool: ...
-    def ClearField(self, field_name: typing_extensions.Literal["default", b"default", "name", b"name", "of", b"of", "window", b"window"]) -> None: ...
+    def ClearField(self, field_name: typing_extensions.Literal["default", b"default", "default_null", b"default_null", "name", b"name", "of", b"of", "window", b"window"]) -> None: ...
 
 global___Stddev = Stddev
 
diff --git a/fennel/internal_lib/utils/utils.py b/fennel/internal_lib/utils/utils.py
index 2c8688d3b..fe0c22250 100644
--- a/fennel/internal_lib/utils/utils.py
+++ b/fennel/internal_lib/utils/utils.py
@@ -1,11 +1,13 @@
 import dataclasses
-from datetime import datetime
-from decimal import Decimal
+from decimal import Decimal as PythonDecimal
+from datetime import datetime, date
 from typing import Any, Optional, Union, Dict
 
 import numpy as np
 import pandas as pd
 from frozendict import frozendict
+from google.protobuf.timestamp_pb2 import Timestamp
+
 from fennel.gen import schema_pb2 as schema_proto
 from fennel.gen.schema_pb2 import DataType
 from fennel.internal_lib import FENNEL_STRUCT
@@ -118,8 +120,8 @@ def cast_col_to_pandas(
         return pd.Series(
             [
                 (
-                    Decimal("%0.{}f".format(scale) % float(x))
-                    if not isinstance(x, Decimal)
+                    PythonDecimal("%0.{}f".format(scale) % float(x))
+                    if not isinstance(x, PythonDecimal)
                     else x
                 )
                 for x in series
@@ -219,3 +221,21 @@ def parse_datetime_in_value(
         return output
     else:
         return value
+
+
+def to_timestamp_proto(dt: datetime) -> Timestamp:
+    ts = Timestamp()
+    ts.FromDatetime(dt)
+    return ts
+
+
+def to_date_proto(dt: date) -> schema_proto.Date:
+    return schema_proto.Date(
+        days=(dt - date(1970, 1, 1)).days,
+    )
+
+
+def to_decimal_proto(decimal: PythonDecimal) -> schema_proto.Decimal:
+    exponent = abs(int(decimal.as_tuple().exponent))
+    value = int((decimal * pow(10, exponent)).to_integral_exact())
+    return schema_proto.Decimal(value=value, scale=exponent)
diff --git a/fennel/testing/execute_aggregation.py b/fennel/testing/execute_aggregation.py
index 073629ae3..09d55c4f1 100644
--- a/fennel/testing/execute_aggregation.py
+++ b/fennel/testing/execute_aggregation.py
@@ -5,7 +5,7 @@
 from datetime import datetime, timezone, timedelta, date
 from decimal import Decimal
 from math import sqrt
-from typing import Dict, List, Type, Union, Any
+from typing import Dict, List, Type, Union, Any, Optional
 
 import numpy as np
 import pandas as pd
@@ -305,7 +305,9 @@ def top(self):
 
 
 class MinState(AggState):
-    def __init__(self, default: Union[float, int, date, datetime, Decimal]):
+    def __init__(
+        self, default: Optional[Union[float, int, date, datetime, Decimal]]
+    ):
         self.counter = Counter()  # type: ignore
         self.min_heap = Heap(heap_type="min")
         self.default = default
@@ -336,7 +338,9 @@ def get_val(self):
 
 
 class MaxState(AggState):
-    def __init__(self, default: Union[float, int, date, datetime, Decimal]):
+    def __init__(
+        self, default: Optional[Union[float, int, date, datetime, Decimal]]
+    ):
         self.counter = Counter()  # type: ignore
         self.max_heap = Heap(heap_type="max")
         self.default = default
diff --git a/fennel/testing/executor.py b/fennel/testing/executor.py
index bfd6e69d1..b9527b4e9 100644
--- a/fennel/testing/executor.py
+++ b/fennel/testing/executor.py
@@ -1,13 +1,10 @@
 import copy
 import types
 from dataclasses import dataclass
-from datetime import datetime, timezone
 from typing import Any, Optional, Dict, List
 
-import numpy as np
 import pandas as pd
 from fennel.expr.visitor import ExprPrinter
-import pyarrow as pa
 from frozendict import frozendict
 
 import fennel.gen.schema_pb2 as schema_proto
@@ -28,7 +25,6 @@
     Stddev,
 )
 from fennel.gen.schema_pb2 import Field
-from fennel.internal_lib.duration import duration_to_timedelta
 from fennel.internal_lib.schema import get_datatype, fennel_is_optional
 from fennel.internal_lib.schema import validate_field_in_df
 from fennel.internal_lib.to_proto import (
@@ -510,13 +506,13 @@ def visitJoin(self, obj) -> Optional[NodeRet]:
                 list(obj.dataset.dsschema().values.keys())
             )
             merged_df = left_join_empty(
-                input_ret, right_ret, right_value_schema, obj.fields
+                input_ret, right_ret, right_value_schema, obj.fields  # type: ignore
             )
         else:
             if len(input_ret.key_fields) > 0:
                 merged_df = table_table_join(
-                    input_ret,
-                    right_ret,
+                    input_ret,  # type: ignore
+                    right_ret,  # type: ignore
                     obj.how,
                     obj.on,
                     obj.left_on,
@@ -525,8 +521,8 @@ def visitJoin(self, obj) -> Optional[NodeRet]:
                 )
             else:
                 merged_df = stream_table_join(
-                    input_ret,
-                    right_ret,
+                    input_ret,  # type: ignore
+                    right_ret,  # type: ignore
                     obj.how,
                     obj.within,
                     obj.on,
diff --git a/pyproject.toml b/pyproject.toml
index 000ed2d38..3111cc182 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
 [tool.poetry]
 name = "fennel-ai"
-version = "1.5.58"
+version = "1.5.59"
 description = "The modern realtime feature engineering platform"
 authors = ["Fennel AI "]
 packages = [{ include = "fennel" }]