From c9e87a7c38bf343709f6e750b5273787b88d0882 Mon Sep 17 00:00:00 2001 From: Aditya Nambiar Date: Fri, 20 Oct 2023 15:22:54 -0700 Subject: [PATCH 1/2] source: Support starting_from param in sources --- fennel/CHANGELOG.md | 3 ++ fennel/datasets/datasets.py | 3 +- fennel/gen/auth_pb2.py | 1 + fennel/gen/connector_pb2.py | 13 +++--- fennel/gen/connector_pb2.pyi | 10 ++++- fennel/gen/dataset_pb2.py | 1 + fennel/gen/expectations_pb2.py | 1 + fennel/gen/featureset_pb2.py | 1 + fennel/gen/format_pb2.py | 1 + fennel/gen/http_auth_pb2.py | 1 + fennel/gen/kinesis_pb2.py | 1 + fennel/gen/metadata_pb2.py | 1 + fennel/gen/pycode_pb2.py | 1 + fennel/gen/schema_pb2.py | 1 + fennel/gen/schema_registry_pb2.py | 1 + fennel/gen/services_pb2.py | 1 + fennel/gen/spec_pb2.py | 1 + fennel/gen/status_pb2.py | 1 + fennel/gen/window_pb2.py | 1 + fennel/lib/to_proto/to_proto.py | 35 +++++++++++++++ fennel/sources/sources.py | 6 +++ fennel/sources/test_invalid_sources.py | 61 ++++++++++++++++++++++++++ fennel/sources/test_sources.py | 7 ++- 23 files changed, 143 insertions(+), 10 deletions(-) diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md index 48591b6f1..826a27f3a 100644 --- a/fennel/CHANGELOG.md +++ b/fennel/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## [0.18.10] - 2023-10-20 +- Add support for `starting_from` in S3 source. +- ## [0.18.9]- 2023-10-27 - Added support for datetime fields in struct types diff --git a/fennel/datasets/datasets.py b/fennel/datasets/datasets.py index 556c8f89e..02bfa55b0 100644 --- a/fennel/datasets/datasets.py +++ b/fennel/datasets/datasets.py @@ -1167,6 +1167,7 @@ def with_source( self, conn: DataConnector, every: Optional[Duration] = None, + starting_from: Optional[datetime.datetime] = None, lateness: Optional[Duration] = None, ): if len(self._pipelines) > 0: @@ -1177,7 +1178,7 @@ def with_source( ds_copy = copy.deepcopy(self) if hasattr(ds_copy, sources.SOURCE_FIELD): delattr(ds_copy, sources.SOURCE_FIELD) - src_fn = source(conn, every, lateness) + src_fn = source(conn, every, starting_from, lateness) return src_fn(ds_copy) def dsschema(self): diff --git a/fennel/gen/auth_pb2.py b/fennel/gen/auth_pb2.py index 5fb3361ee..097f0332d 100644 --- a/fennel/gen/auth_pb2.py +++ b/fennel/gen/auth_pb2.py @@ -19,6 +19,7 @@ _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'auth_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None _globals['_ORGPERMISSION']._serialized_start=34 _globals['_ORGPERMISSION']._serialized_end=218 diff --git a/fennel/gen/connector_pb2.py b/fennel/gen/connector_pb2.py index 49a4ca2b6..f5de94502 100644 --- a/fennel/gen/connector_pb2.py +++ b/fennel/gen/connector_pb2.py @@ -18,19 +18,20 @@ import fennel.gen.schema_registry_pb2 as schema__registry__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0f\x63onnector.proto\x12\x16\x66\x65nnel.proto.connector\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/wrappers.proto\x1a\rkinesis.proto\x1a\x15schema_registry.proto\"\xf4\x03\n\x0b\x45xtDatabase\x12\x0c\n\x04name\x18\x01 \x01(\t\x12.\n\x05mysql\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.connector.MySQLH\x00\x12\x34\n\x08postgres\x18\x03 \x01(\x0b\x32 .fennel.proto.connector.PostgresH\x00\x12\x36\n\treference\x18\x04 \x01(\x0b\x32!.fennel.proto.connector.ReferenceH\x00\x12(\n\x02s3\x18\x05 \x01(\x0b\x32\x1a.fennel.proto.connector.S3H\x00\x12\x34\n\x08\x62igquery\x18\x06 \x01(\x0b\x32 .fennel.proto.connector.BigqueryH\x00\x12\x36\n\tsnowflake\x18\x07 \x01(\x0b\x32!.fennel.proto.connector.SnowflakeH\x00\x12.\n\x05kafka\x18\x08 \x01(\x0b\x32\x1d.fennel.proto.connector.KafkaH\x00\x12\x32\n\x07webhook\x18\t \x01(\x0b\x32\x1f.fennel.proto.connector.WebhookH\x00\x12\x32\n\x07kinesis\x18\n \x01(\x0b\x32\x1f.fennel.proto.connector.KinesisH\x00\x42\t\n\x07variant\"\x80\x01\n\x0bKafkaFormat\x12\x32\n\x04json\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.JsonFormatH\x00\x12\x32\n\x04\x61vro\x18\x02 \x01(\x0b\x32\".fennel.proto.connector.AvroFormatH\x00\x42\t\n\x07variant\"\x0c\n\nJsonFormat\"S\n\nAvroFormat\x12\x45\n\x0fschema_registry\x18\x01 \x01(\x0b\x32,.fennel.proto.schema_registry.SchemaRegistry\"\xb8\x01\n\tReference\x12;\n\x06\x64\x62type\x18\x01 \x01(\x0e\x32+.fennel.proto.connector.Reference.ExtDBType\"n\n\tExtDBType\x12\t\n\x05MYSQL\x10\x00\x12\x0c\n\x08POSTGRES\x10\x01\x12\x06\n\x02S3\x10\x02\x12\t\n\x05KAFKA\x10\x03\x12\x0c\n\x08\x42IGQUERY\x10\x04\x12\r\n\tSNOWFLAKE\x10\x05\x12\x0b\n\x07WEBHOOK\x10\x06\x12\x0b\n\x07KINESIS\x10\x07\"\x17\n\x07Webhook\x12\x0c\n\x04name\x18\x01 \x01(\t\"j\n\x05MySQL\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04user\x18\x03 \x01(\t\x12\x10\n\x08password\x18\x04 \x01(\t\x12\x0c\n\x04port\x18\x05 \x01(\r\x12\x13\n\x0bjdbc_params\x18\x06 \x01(\t\"m\n\x08Postgres\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04user\x18\x03 \x01(\t\x12\x10\n\x08password\x18\x04 \x01(\t\x12\x0c\n\x04port\x18\x05 \x01(\r\x12\x13\n\x0bjdbc_params\x18\x06 \x01(\t\">\n\x02S3\x12\x1d\n\x15\x61ws_secret_access_key\x18\x01 \x01(\t\x12\x19\n\x11\x61ws_access_key_id\x18\x02 \x01(\t\"I\n\x08\x42igquery\x12\x0f\n\x07\x64\x61taset\x18\x01 \x01(\t\x12\x18\n\x10\x63redentials_json\x18\x02 \x01(\t\x12\x12\n\nproject_id\x18\x03 \x01(\t\"\x94\x01\n\tSnowflake\x12\x0f\n\x07\x61\x63\x63ount\x18\x01 \x01(\t\x12\x0c\n\x04user\x18\x02 \x01(\t\x12\x10\n\x08password\x18\x03 \x01(\t\x12\x0e\n\x06schema\x18\x04 \x01(\t\x12\x11\n\twarehouse\x18\x05 \x01(\t\x12\x0c\n\x04role\x18\x06 \x01(\t\x12\x13\n\x0bjdbc_params\x18\x07 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\t \x01(\t\"\x8c\x02\n\x05Kafka\x12\x19\n\x11\x62ootstrap_servers\x18\x01 \x01(\t\x12\x19\n\x11security_protocol\x18\x02 \x01(\t\x12\x16\n\x0esasl_mechanism\x18\x03 \x01(\t\x12\x1c\n\x10sasl_jaas_config\x18\x04 \x01(\tB\x02\x18\x01\x12\x1b\n\x13sasl_plain_username\x18\x05 \x01(\t\x12\x1b\n\x13sasl_plain_password\x18\x06 \x01(\t\x12\x14\n\x08group_id\x18\x07 \x01(\tB\x02\x18\x01\x12G\n#enable_ssl_certificate_verification\x18\x08 \x01(\x0b\x32\x1a.google.protobuf.BoolValue\"\x1b\n\x07Kinesis\x12\x10\n\x08role_arn\x18\x01 \x01(\t\"\xfd\x03\n\x08\x45xtTable\x12\x39\n\x0bmysql_table\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.MySQLTableH\x00\x12\x39\n\x08pg_table\x18\x02 \x01(\x0b\x32%.fennel.proto.connector.PostgresTableH\x00\x12\x33\n\x08s3_table\x18\x03 \x01(\x0b\x32\x1f.fennel.proto.connector.S3TableH\x00\x12\x39\n\x0bkafka_topic\x18\x04 \x01(\x0b\x32\".fennel.proto.connector.KafkaTopicH\x00\x12\x41\n\x0fsnowflake_table\x18\x05 \x01(\x0b\x32&.fennel.proto.connector.SnowflakeTableH\x00\x12?\n\x0e\x62igquery_table\x18\x06 \x01(\x0b\x32%.fennel.proto.connector.BigqueryTableH\x00\x12;\n\x08\x65ndpoint\x18\x07 \x01(\x0b\x32\'.fennel.proto.connector.WebhookEndpointH\x00\x12?\n\x0ekinesis_stream\x18\x08 \x01(\x0b\x32%.fennel.proto.connector.KinesisStreamH\x00\x42\t\n\x07variant\"Q\n\nMySQLTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"T\n\rPostgresTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"\x96\x01\n\x07S3Table\x12\x0e\n\x06\x62ucket\x18\x01 \x01(\t\x12\x13\n\x0bpath_prefix\x18\x02 \x01(\t\x12\x11\n\tdelimiter\x18\x04 \x01(\t\x12\x0e\n\x06\x66ormat\x18\x05 \x01(\t\x12/\n\x02\x64\x62\x18\x06 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\npre_sorted\x18\x07 \x01(\x08\"\x81\x01\n\nKafkaTopic\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\r\n\x05topic\x18\x02 \x01(\t\x12\x33\n\x06\x66ormat\x18\x03 \x01(\x0b\x32#.fennel.proto.connector.KafkaFormat\"T\n\rBigqueryTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"U\n\x0eSnowflakeTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"T\n\x0fWebhookEndpoint\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08\x65ndpoint\x18\x02 \x01(\t\"\xd3\x01\n\rKinesisStream\x12\x12\n\nstream_arn\x18\x01 \x01(\t\x12\x39\n\rinit_position\x18\x02 \x01(\x0e\x32\".fennel.proto.kinesis.InitPosition\x12\x32\n\x0einit_timestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0e\n\x06\x66ormat\x18\x04 \x01(\t\x12/\n\x02\x64\x62\x18\x05 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\"\x8c\x02\n\x06Source\x12/\n\x05table\x18\x01 \x01(\x0b\x32 .fennel.proto.connector.ExtTable\x12\x0f\n\x07\x64\x61taset\x18\x02 \x01(\t\x12(\n\x05\x65very\x18\x03 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x13\n\x06\x63ursor\x18\x04 \x01(\tH\x00\x88\x01\x01\x12+\n\x08lateness\x18\x05 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x17\n\x0ftimestamp_field\x18\x06 \x01(\t\x12\x30\n\x03\x63\x64\x63\x18\x07 \x01(\x0e\x32#.fennel.proto.connector.CDCStrategyB\t\n\x07_cursor\"H\n\x04Sink\x12/\n\x05table\x18\x01 \x01(\x0b\x32 .fennel.proto.connector.ExtTable\x12\x0f\n\x07\x64\x61taset\x18\x02 \x01(\t*?\n\x0b\x43\x44\x43Strategy\x12\n\n\x06\x41ppend\x10\x00\x12\n\n\x06Upsert\x10\x01\x12\x0c\n\x08\x44\x65\x62\x65zium\x10\x02\x12\n\n\x06Native\x10\x03\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0f\x63onnector.proto\x12\x16\x66\x65nnel.proto.connector\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/wrappers.proto\x1a\rkinesis.proto\x1a\x15schema_registry.proto\"\xf4\x03\n\x0b\x45xtDatabase\x12\x0c\n\x04name\x18\x01 \x01(\t\x12.\n\x05mysql\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.connector.MySQLH\x00\x12\x34\n\x08postgres\x18\x03 \x01(\x0b\x32 .fennel.proto.connector.PostgresH\x00\x12\x36\n\treference\x18\x04 \x01(\x0b\x32!.fennel.proto.connector.ReferenceH\x00\x12(\n\x02s3\x18\x05 \x01(\x0b\x32\x1a.fennel.proto.connector.S3H\x00\x12\x34\n\x08\x62igquery\x18\x06 \x01(\x0b\x32 .fennel.proto.connector.BigqueryH\x00\x12\x36\n\tsnowflake\x18\x07 \x01(\x0b\x32!.fennel.proto.connector.SnowflakeH\x00\x12.\n\x05kafka\x18\x08 \x01(\x0b\x32\x1d.fennel.proto.connector.KafkaH\x00\x12\x32\n\x07webhook\x18\t \x01(\x0b\x32\x1f.fennel.proto.connector.WebhookH\x00\x12\x32\n\x07kinesis\x18\n \x01(\x0b\x32\x1f.fennel.proto.connector.KinesisH\x00\x42\t\n\x07variant\"\x80\x01\n\x0bKafkaFormat\x12\x32\n\x04json\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.JsonFormatH\x00\x12\x32\n\x04\x61vro\x18\x02 \x01(\x0b\x32\".fennel.proto.connector.AvroFormatH\x00\x42\t\n\x07variant\"\x0c\n\nJsonFormat\"S\n\nAvroFormat\x12\x45\n\x0fschema_registry\x18\x01 \x01(\x0b\x32,.fennel.proto.schema_registry.SchemaRegistry\"\xb8\x01\n\tReference\x12;\n\x06\x64\x62type\x18\x01 \x01(\x0e\x32+.fennel.proto.connector.Reference.ExtDBType\"n\n\tExtDBType\x12\t\n\x05MYSQL\x10\x00\x12\x0c\n\x08POSTGRES\x10\x01\x12\x06\n\x02S3\x10\x02\x12\t\n\x05KAFKA\x10\x03\x12\x0c\n\x08\x42IGQUERY\x10\x04\x12\r\n\tSNOWFLAKE\x10\x05\x12\x0b\n\x07WEBHOOK\x10\x06\x12\x0b\n\x07KINESIS\x10\x07\"\x17\n\x07Webhook\x12\x0c\n\x04name\x18\x01 \x01(\t\"j\n\x05MySQL\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04user\x18\x03 \x01(\t\x12\x10\n\x08password\x18\x04 \x01(\t\x12\x0c\n\x04port\x18\x05 \x01(\r\x12\x13\n\x0bjdbc_params\x18\x06 \x01(\t\"m\n\x08Postgres\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04user\x18\x03 \x01(\t\x12\x10\n\x08password\x18\x04 \x01(\t\x12\x0c\n\x04port\x18\x05 \x01(\r\x12\x13\n\x0bjdbc_params\x18\x06 \x01(\t\">\n\x02S3\x12\x1d\n\x15\x61ws_secret_access_key\x18\x01 \x01(\t\x12\x19\n\x11\x61ws_access_key_id\x18\x02 \x01(\t\"I\n\x08\x42igquery\x12\x0f\n\x07\x64\x61taset\x18\x01 \x01(\t\x12\x18\n\x10\x63redentials_json\x18\x02 \x01(\t\x12\x12\n\nproject_id\x18\x03 \x01(\t\"\x94\x01\n\tSnowflake\x12\x0f\n\x07\x61\x63\x63ount\x18\x01 \x01(\t\x12\x0c\n\x04user\x18\x02 \x01(\t\x12\x10\n\x08password\x18\x03 \x01(\t\x12\x0e\n\x06schema\x18\x04 \x01(\t\x12\x11\n\twarehouse\x18\x05 \x01(\t\x12\x0c\n\x04role\x18\x06 \x01(\t\x12\x13\n\x0bjdbc_params\x18\x07 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\t \x01(\t\"\x8c\x02\n\x05Kafka\x12\x19\n\x11\x62ootstrap_servers\x18\x01 \x01(\t\x12\x19\n\x11security_protocol\x18\x02 \x01(\t\x12\x16\n\x0esasl_mechanism\x18\x03 \x01(\t\x12\x1c\n\x10sasl_jaas_config\x18\x04 \x01(\tB\x02\x18\x01\x12\x1b\n\x13sasl_plain_username\x18\x05 \x01(\t\x12\x1b\n\x13sasl_plain_password\x18\x06 \x01(\t\x12\x14\n\x08group_id\x18\x07 \x01(\tB\x02\x18\x01\x12G\n#enable_ssl_certificate_verification\x18\x08 \x01(\x0b\x32\x1a.google.protobuf.BoolValue\"\x1b\n\x07Kinesis\x12\x10\n\x08role_arn\x18\x01 \x01(\t\"\xfd\x03\n\x08\x45xtTable\x12\x39\n\x0bmysql_table\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.MySQLTableH\x00\x12\x39\n\x08pg_table\x18\x02 \x01(\x0b\x32%.fennel.proto.connector.PostgresTableH\x00\x12\x33\n\x08s3_table\x18\x03 \x01(\x0b\x32\x1f.fennel.proto.connector.S3TableH\x00\x12\x39\n\x0bkafka_topic\x18\x04 \x01(\x0b\x32\".fennel.proto.connector.KafkaTopicH\x00\x12\x41\n\x0fsnowflake_table\x18\x05 \x01(\x0b\x32&.fennel.proto.connector.SnowflakeTableH\x00\x12?\n\x0e\x62igquery_table\x18\x06 \x01(\x0b\x32%.fennel.proto.connector.BigqueryTableH\x00\x12;\n\x08\x65ndpoint\x18\x07 \x01(\x0b\x32\'.fennel.proto.connector.WebhookEndpointH\x00\x12?\n\x0ekinesis_stream\x18\x08 \x01(\x0b\x32%.fennel.proto.connector.KinesisStreamH\x00\x42\t\n\x07variant\"Q\n\nMySQLTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"T\n\rPostgresTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"\x96\x01\n\x07S3Table\x12\x0e\n\x06\x62ucket\x18\x01 \x01(\t\x12\x13\n\x0bpath_prefix\x18\x02 \x01(\t\x12\x11\n\tdelimiter\x18\x04 \x01(\t\x12\x0e\n\x06\x66ormat\x18\x05 \x01(\t\x12/\n\x02\x64\x62\x18\x06 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\npre_sorted\x18\x07 \x01(\x08\"\x81\x01\n\nKafkaTopic\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\r\n\x05topic\x18\x02 \x01(\t\x12\x33\n\x06\x66ormat\x18\x03 \x01(\x0b\x32#.fennel.proto.connector.KafkaFormat\"T\n\rBigqueryTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"U\n\x0eSnowflakeTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"T\n\x0fWebhookEndpoint\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08\x65ndpoint\x18\x02 \x01(\t\"\xd3\x01\n\rKinesisStream\x12\x12\n\nstream_arn\x18\x01 \x01(\t\x12\x39\n\rinit_position\x18\x02 \x01(\x0e\x32\".fennel.proto.kinesis.InitPosition\x12\x32\n\x0einit_timestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0e\n\x06\x66ormat\x18\x04 \x01(\t\x12/\n\x02\x64\x62\x18\x05 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\"\xbf\x02\n\x06Source\x12/\n\x05table\x18\x01 \x01(\x0b\x32 .fennel.proto.connector.ExtTable\x12\x0f\n\x07\x64\x61taset\x18\x02 \x01(\t\x12(\n\x05\x65very\x18\x03 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x13\n\x06\x63ursor\x18\x04 \x01(\tH\x00\x88\x01\x01\x12+\n\x08lateness\x18\x05 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x17\n\x0ftimestamp_field\x18\x06 \x01(\t\x12\x30\n\x03\x63\x64\x63\x18\x07 \x01(\x0e\x32#.fennel.proto.connector.CDCStrategy\x12\x31\n\rstarting_from\x18\x08 \x01(\x0b\x32\x1a.google.protobuf.TimestampB\t\n\x07_cursor\"H\n\x04Sink\x12/\n\x05table\x18\x01 \x01(\x0b\x32 .fennel.proto.connector.ExtTable\x12\x0f\n\x07\x64\x61taset\x18\x02 \x01(\t*K\n\x0b\x43\x44\x43Strategy\x12\n\n\x06\x41ppend\x10\x00\x12\n\n\x06Upsert\x10\x01\x12\x0c\n\x08\x44\x65\x62\x65zium\x10\x02\x12\n\n\x06Native\x10\x03\x12\n\n\x06\x44\x65lete\x10\x04\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'connector_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None _KAFKA.fields_by_name['sasl_jaas_config']._options = None _KAFKA.fields_by_name['sasl_jaas_config']._serialized_options = b'\030\001' _KAFKA.fields_by_name['group_id']._options = None _KAFKA.fields_by_name['group_id']._serialized_options = b'\030\001' - _globals['_CDCSTRATEGY']._serialized_start=3716 - _globals['_CDCSTRATEGY']._serialized_end=3779 + _globals['_CDCSTRATEGY']._serialized_start=3767 + _globals['_CDCSTRATEGY']._serialized_end=3842 _globals['_EXTDATABASE']._serialized_start=179 _globals['_EXTDATABASE']._serialized_end=679 _globals['_KAFKAFORMAT']._serialized_start=682 @@ -78,7 +79,7 @@ _globals['_KINESISSTREAM']._serialized_start=3158 _globals['_KINESISSTREAM']._serialized_end=3369 _globals['_SOURCE']._serialized_start=3372 - _globals['_SOURCE']._serialized_end=3640 - _globals['_SINK']._serialized_start=3642 - _globals['_SINK']._serialized_end=3714 + _globals['_SOURCE']._serialized_end=3691 + _globals['_SINK']._serialized_start=3693 + _globals['_SINK']._serialized_end=3765 # @@protoc_insertion_point(module_scope) diff --git a/fennel/gen/connector_pb2.pyi b/fennel/gen/connector_pb2.pyi index cd3c5be54..960881765 100644 --- a/fennel/gen/connector_pb2.pyi +++ b/fennel/gen/connector_pb2.pyi @@ -40,6 +40,7 @@ class _CDCStrategyEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._En Upsert: _CDCStrategy.ValueType # 1 Debezium: _CDCStrategy.ValueType # 2 Native: _CDCStrategy.ValueType # 3 + Delete: _CDCStrategy.ValueType # 4 class CDCStrategy(_CDCStrategy, metaclass=_CDCStrategyEnumTypeWrapper): ... @@ -47,6 +48,7 @@ Append: CDCStrategy.ValueType # 0 Upsert: CDCStrategy.ValueType # 1 Debezium: CDCStrategy.ValueType # 2 Native: CDCStrategy.ValueType # 3 +Delete: CDCStrategy.ValueType # 4 global___CDCStrategy = CDCStrategy @typing_extensions.final @@ -655,6 +657,7 @@ class Source(google.protobuf.message.Message): LATENESS_FIELD_NUMBER: builtins.int TIMESTAMP_FIELD_FIELD_NUMBER: builtins.int CDC_FIELD_NUMBER: builtins.int + STARTING_FROM_FIELD_NUMBER: builtins.int @property def table(self) -> global___ExtTable: ... dataset: builtins.str @@ -665,6 +668,8 @@ class Source(google.protobuf.message.Message): def lateness(self) -> google.protobuf.duration_pb2.Duration: ... timestamp_field: builtins.str cdc: global___CDCStrategy.ValueType + @property + def starting_from(self) -> google.protobuf.timestamp_pb2.Timestamp: ... def __init__( self, *, @@ -675,9 +680,10 @@ class Source(google.protobuf.message.Message): lateness: google.protobuf.duration_pb2.Duration | None = ..., timestamp_field: builtins.str = ..., cdc: global___CDCStrategy.ValueType = ..., + starting_from: google.protobuf.timestamp_pb2.Timestamp | None = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["_cursor", b"_cursor", "cursor", b"cursor", "every", b"every", "lateness", b"lateness", "table", b"table"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["_cursor", b"_cursor", "cdc", b"cdc", "cursor", b"cursor", "dataset", b"dataset", "every", b"every", "lateness", b"lateness", "table", b"table", "timestamp_field", b"timestamp_field"]) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["_cursor", b"_cursor", "cursor", b"cursor", "every", b"every", "lateness", b"lateness", "starting_from", b"starting_from", "table", b"table"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["_cursor", b"_cursor", "cdc", b"cdc", "cursor", b"cursor", "dataset", b"dataset", "every", b"every", "lateness", b"lateness", "starting_from", b"starting_from", "table", b"table", "timestamp_field", b"timestamp_field"]) -> None: ... def WhichOneof(self, oneof_group: typing_extensions.Literal["_cursor", b"_cursor"]) -> typing_extensions.Literal["cursor"] | None: ... global___Source = Source diff --git a/fennel/gen/dataset_pb2.py b/fennel/gen/dataset_pb2.py index aa173f831..77def50d5 100644 --- a/fennel/gen/dataset_pb2.py +++ b/fennel/gen/dataset_pb2.py @@ -24,6 +24,7 @@ _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'dataset_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None _COREDATASET_FIELDMETADATAENTRY._options = None _COREDATASET_FIELDMETADATAENTRY._serialized_options = b'8\001' diff --git a/fennel/gen/expectations_pb2.py b/fennel/gen/expectations_pb2.py index 864b2c4cc..625f49c05 100644 --- a/fennel/gen/expectations_pb2.py +++ b/fennel/gen/expectations_pb2.py @@ -20,6 +20,7 @@ _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'expectations_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None _globals['_EXPECTATIONS']._serialized_start=66 _globals['_EXPECTATIONS']._serialized_end=385 diff --git a/fennel/gen/featureset_pb2.py b/fennel/gen/featureset_pb2.py index 5728c1c73..1d1357f1b 100644 --- a/fennel/gen/featureset_pb2.py +++ b/fennel/gen/featureset_pb2.py @@ -22,6 +22,7 @@ _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'featureset_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None _globals['_EXTRACTORTYPE']._serialized_start=1204 _globals['_EXTRACTORTYPE']._serialized_end=1255 diff --git a/fennel/gen/format_pb2.py b/fennel/gen/format_pb2.py index 459bc2571..f7d3eb19c 100644 --- a/fennel/gen/format_pb2.py +++ b/fennel/gen/format_pb2.py @@ -19,6 +19,7 @@ _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'format_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None _globals['_FILEFORMAT']._serialized_start=38 _globals['_FILEFORMAT']._serialized_end=474 diff --git a/fennel/gen/http_auth_pb2.py b/fennel/gen/http_auth_pb2.py index a51d66082..3f669cad2 100644 --- a/fennel/gen/http_auth_pb2.py +++ b/fennel/gen/http_auth_pb2.py @@ -20,6 +20,7 @@ _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'http_auth_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None _globals['_HTTPAUTHENTICATION']._serialized_start=76 _globals['_HTTPAUTHENTICATION']._serialized_end=231 diff --git a/fennel/gen/kinesis_pb2.py b/fennel/gen/kinesis_pb2.py index 520ff7fd2..f2074a3b4 100644 --- a/fennel/gen/kinesis_pb2.py +++ b/fennel/gen/kinesis_pb2.py @@ -19,6 +19,7 @@ _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'kinesis_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None _globals['_INITPOSITION']._serialized_start=39 _globals['_INITPOSITION']._serialized_end=101 diff --git a/fennel/gen/metadata_pb2.py b/fennel/gen/metadata_pb2.py index d9d355cc9..f46969763 100644 --- a/fennel/gen/metadata_pb2.py +++ b/fennel/gen/metadata_pb2.py @@ -19,6 +19,7 @@ _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'metadata_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None _globals['_METADATA']._serialized_start=41 _globals['_METADATA']._serialized_end=138 diff --git a/fennel/gen/pycode_pb2.py b/fennel/gen/pycode_pb2.py index 53cde992b..07b72b79c 100644 --- a/fennel/gen/pycode_pb2.py +++ b/fennel/gen/pycode_pb2.py @@ -19,6 +19,7 @@ _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'pycode_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None _PYCODE_REFINCLUDESENTRY._options = None _PYCODE_REFINCLUDESENTRY._serialized_options = b'8\001' diff --git a/fennel/gen/schema_pb2.py b/fennel/gen/schema_pb2.py index bfa95d699..ff2246d25 100644 --- a/fennel/gen/schema_pb2.py +++ b/fennel/gen/schema_pb2.py @@ -20,6 +20,7 @@ _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'schema_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None _globals['_DATATYPE']._serialized_start=71 _globals['_DATATYPE']._serialized_end=807 diff --git a/fennel/gen/schema_registry_pb2.py b/fennel/gen/schema_registry_pb2.py index 5bfb5b171..314f9cb23 100644 --- a/fennel/gen/schema_registry_pb2.py +++ b/fennel/gen/schema_registry_pb2.py @@ -20,6 +20,7 @@ _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'schema_registry_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None _globals['_REGISTRYPROVIDER']._serialized_start=237 _globals['_REGISTRYPROVIDER']._serialized_end=270 diff --git a/fennel/gen/services_pb2.py b/fennel/gen/services_pb2.py index 23a0fb87f..8b067966b 100644 --- a/fennel/gen/services_pb2.py +++ b/fennel/gen/services_pb2.py @@ -23,6 +23,7 @@ _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'services_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None _globals['_SYNCREQUEST']._serialized_start=112 _globals['_SYNCREQUEST']._serialized_end=664 diff --git a/fennel/gen/spec_pb2.py b/fennel/gen/spec_pb2.py index bcb1c7a42..02ddbff9b 100644 --- a/fennel/gen/spec_pb2.py +++ b/fennel/gen/spec_pb2.py @@ -20,6 +20,7 @@ _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'spec_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None _globals['_PRESPEC']._serialized_start=48 _globals['_PRESPEC']._serialized_end=413 diff --git a/fennel/gen/status_pb2.py b/fennel/gen/status_pb2.py index 45d0b4e01..945586fec 100644 --- a/fennel/gen/status_pb2.py +++ b/fennel/gen/status_pb2.py @@ -20,6 +20,7 @@ _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'status_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None DESCRIPTOR._serialized_options = b'\n\016com.google.rpcB\013StatusProtoP\001Z7google.golang.org/genproto/googleapis/rpc/status;status\370\001\001\242\002\003RPC' _globals['_STATUS']._serialized_start=64 diff --git a/fennel/gen/window_pb2.py b/fennel/gen/window_pb2.py index d59e18eca..a711fde19 100644 --- a/fennel/gen/window_pb2.py +++ b/fennel/gen/window_pb2.py @@ -20,6 +20,7 @@ _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'window_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None _globals['_WINDOW']._serialized_start=69 _globals['_WINDOW']._serialized_end=186 diff --git a/fennel/lib/to_proto/to_proto.py b/fennel/lib/to_proto/to_proto.py index 0fedda953..0e0160e29 100644 --- a/fennel/lib/to_proto/to_proto.py +++ b/fennel/lib/to_proto/to_proto.py @@ -516,6 +516,10 @@ def _kafka_conn_to_source_proto( data_source.sasl_plain_password, data_source.verify_cert, ) + if connector.starting_from is not None: + raise ValueError( + "KafkaConnector does not support starting_from parameter" + ) source = connector_proto.Source( table=connector_proto.ExtTable( kafka_topic=connector_proto.KafkaTopic( @@ -581,11 +585,21 @@ def _s3_conn_to_source_proto( format=connector.format, presorted=connector.presorted, ) + if connector.starting_from is not None: + if connector.format != "delta": + raise ValueError( + "Only delta format supports starting_from parameter" + ) + starting_from = Timestamp() + starting_from.FromDatetime(connector.starting_from) + else: + starting_from = None source = connector_proto.Source( table=ext_table, dataset=dataset_name, every=to_duration_proto(connector.every), lateness=to_duration_proto(connector.lateness), + starting_from=starting_from, cursor=None, timestamp_field=timestamp_field, cdc=to_cdc_proto(connector.cdc), @@ -672,6 +686,10 @@ def _bigquery_conn_to_source_proto( ext_db, table_name=connector.table_name, ) + if connector.starting_from is not None: + raise ValueError( + "BigQuery connector does not support starting_from parameter" + ) return ( ext_db, connector_proto.Source( @@ -733,6 +751,10 @@ def _snowflake_conn_to_source_proto( ext_table = _snowflake_to_ext_table_proto( db=ext_db, table_name=connector.table_name ) + if connector.starting_from is not None: + raise ValueError( + "Snowflake connector does not support starting_from parameter" + ) return ( ext_db, connector_proto.Source( @@ -808,6 +830,11 @@ def _mysql_conn_to_source_proto( ext_table = _mysql_to_ext_table_proto( db=ext_db, table_name=connector.table_name ) + + if connector.starting_from is not None: + raise ValueError( + "MySQL connector does not support starting_from parameter" + ) return ( ext_db, connector_proto.Source( @@ -887,6 +914,10 @@ def _pg_conn_to_source_proto( ext_table = _pg_to_ext_table_proto( db=ext_db, table_name=connector.table_name ) + if connector.starting_from is not None: + raise ValueError( + "Postgres connector does not support starting_from parameter" + ) return ( ext_db, connector_proto.Source( @@ -965,6 +996,10 @@ def _kinesis_conn_to_source_proto( init_timestamp=connector.init_timestamp, format=connector.format, ) + if connector.starting_from is not None: + raise ValueError( + "Kinesis connector does not support starting_from parameter" + ) return ( ext_db, connector_proto.Source( diff --git a/fennel/sources/sources.py b/fennel/sources/sources.py index 86447a729..6a7f99f5c 100644 --- a/fennel/sources/sources.py +++ b/fennel/sources/sources.py @@ -28,6 +28,7 @@ def source( conn: DataConnector, every: Optional[Duration] = None, + starting_from: Optional[datetime] = None, lateness: Optional[Duration] = None, cdc: Optional[str] = None, ) -> Callable[[T], Any]: @@ -39,10 +40,14 @@ def source( f"{', '.join(conn.required_fields())}." ) + if starting_from is not None and not isinstance(starting_from, datetime): + raise TypeError("starting_from must be of type datetime") + def decorator(dataset_cls: T): conn.every = every if every is not None else DEFAULT_EVERY conn.lateness = lateness if lateness is not None else DEFAULT_LATENESS conn.cdc = cdc if cdc is not None else DEFAULT_CDC + conn.starting_from = starting_from if hasattr(dataset_cls, SOURCE_FIELD): raise Exception( "Multiple sources are not supported in dataset `%s`." @@ -360,6 +365,7 @@ class DataConnector: every: Duration lateness: Duration cdc: str + starting_from: Optional[datetime] = None def identifier(self): raise NotImplementedError diff --git a/fennel/sources/test_invalid_sources.py b/fennel/sources/test_invalid_sources.py index 443e76682..2f4a1c5b4 100644 --- a/fennel/sources/test_invalid_sources.py +++ b/fennel/sources/test_invalid_sources.py @@ -282,6 +282,67 @@ def test_invalid_kaffa_security_protocol(): assert "security protocol must be one of" in str(e.value) +bigquery = BigQuery( + name="bq_movie_tags", + project_id="gold-cocoa-356105", + dataset_id="movie_tags", + credentials_json="{}", +) + + +def test_invalid_starting_from(): + with pytest.raises(Exception) as e: + + @meta(owner="test@test.com") + @source( + bigquery.table("users_bq", cursor="added_on"), + every="1h", + lateness="2h", + starting_from=datetime.now(), + ) + @dataset + class UserInfoDatasetBigQuery: + user_id: int = field(key=True) + name: str + gender: str + # Users date of birth + dob: str + age: int + account_creation_date: datetime + country: Optional[str] + timestamp: datetime = field(timestamp=True) + + # bigquery source + view = InternalTestClient() + view.add(UserInfoDatasetBigQuery) + _sync_request = view._get_sync_request_proto() # noqa + assert "BigQuery connector does not support starting_from parameter" in str( + e.value + ) + + with pytest.raises(Exception) as e: + + @source( + s3.bucket(bucket_name="bucket", prefix="prefix"), + every="1h", + starting_from="2020-01-01T00:00:00Z", + ) + @meta(owner="aditya@fennel.ai") + @dataset + class UserInfoDataset: + user_id: int = field(key=True) + name: str + gender: str + # Users date of birth + dob: str + age: int + account_creation_date: datetime + country: Optional[str] + timestamp: datetime = field(timestamp=True) + + assert "starting_from must be of type datetime" in str(e.value) + + def test_invalid_s3_format(): with pytest.raises(Exception) as e: s3.bucket(bucket_name="bucket", prefix="prefix", format="py") diff --git a/fennel/sources/test_sources.py b/fennel/sources/test_sources.py index 99dcd72cb..9053671d1 100644 --- a/fennel/sources/test_sources.py +++ b/fennel/sources/test_sources.py @@ -231,9 +231,13 @@ def test_multiple_sources(): bucket_name="all_ratings", prefix="prod/apac/", presorted=True, + format="delta", ), every="1h", lateness="2d", + starting_from=datetime.strptime( + "2021-08-10T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ" + ), ) @dataset class UserInfoDatasetS3: @@ -262,7 +266,7 @@ class UserInfoDatasetS3: "bucket": "all_ratings", "pathPrefix": "prod/apac/", "delimiter": ",", - "format": "csv", + "format": "delta", "preSorted": True, "db": { "name": "ratings_source", @@ -276,6 +280,7 @@ class UserInfoDatasetS3: "dataset": "UserInfoDatasetS3", "every": "3600s", "lateness": "172800s", + "startingFrom": "2021-08-10T00:00:00Z", "timestamp_field": "timestamp", } expected_source_request = ParseDict(s, connector_proto.Source()) From 2df1063f8dbfba67c9f83006392cd02bb3629b43 Mon Sep 17 00:00:00 2001 From: Aditya Nambiar Date: Mon, 23 Oct 2023 11:29:25 -0700 Subject: [PATCH 2/2] Rename starting_from to sinc --- fennel/CHANGELOG.md | 6 +++--- fennel/sources/sources.py | 6 +++--- fennel/sources/test_invalid_sources.py | 4 ++-- fennel/sources/test_sources.py | 4 +--- 4 files changed, 9 insertions(+), 11 deletions(-) diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md index 826a27f3a..a2a761220 100644 --- a/fennel/CHANGELOG.md +++ b/fennel/CHANGELOG.md @@ -1,8 +1,8 @@ # Changelog -## [0.18.10] - 2023-10-20 -- Add support for `starting_from` in S3 source. -- +## [0.18.10] - 2023-10-30 +- Add support for `since` in S3 source. + ## [0.18.9]- 2023-10-27 - Added support for datetime fields in struct types diff --git a/fennel/sources/sources.py b/fennel/sources/sources.py index 6a7f99f5c..8d2f28bd4 100644 --- a/fennel/sources/sources.py +++ b/fennel/sources/sources.py @@ -28,7 +28,7 @@ def source( conn: DataConnector, every: Optional[Duration] = None, - starting_from: Optional[datetime] = None, + since: Optional[datetime] = None, lateness: Optional[Duration] = None, cdc: Optional[str] = None, ) -> Callable[[T], Any]: @@ -40,14 +40,14 @@ def source( f"{', '.join(conn.required_fields())}." ) - if starting_from is not None and not isinstance(starting_from, datetime): + if since is not None and not isinstance(since, datetime): raise TypeError("starting_from must be of type datetime") def decorator(dataset_cls: T): conn.every = every if every is not None else DEFAULT_EVERY conn.lateness = lateness if lateness is not None else DEFAULT_LATENESS conn.cdc = cdc if cdc is not None else DEFAULT_CDC - conn.starting_from = starting_from + conn.starting_from = since if hasattr(dataset_cls, SOURCE_FIELD): raise Exception( "Multiple sources are not supported in dataset `%s`." diff --git a/fennel/sources/test_invalid_sources.py b/fennel/sources/test_invalid_sources.py index 2f4a1c5b4..62cd4b9d6 100644 --- a/fennel/sources/test_invalid_sources.py +++ b/fennel/sources/test_invalid_sources.py @@ -298,7 +298,7 @@ def test_invalid_starting_from(): bigquery.table("users_bq", cursor="added_on"), every="1h", lateness="2h", - starting_from=datetime.now(), + since=datetime.now(), ) @dataset class UserInfoDatasetBigQuery: @@ -325,7 +325,7 @@ class UserInfoDatasetBigQuery: @source( s3.bucket(bucket_name="bucket", prefix="prefix"), every="1h", - starting_from="2020-01-01T00:00:00Z", + since="2020-01-01T00:00:00Z", ) @meta(owner="aditya@fennel.ai") @dataset diff --git a/fennel/sources/test_sources.py b/fennel/sources/test_sources.py index 9053671d1..46493b27b 100644 --- a/fennel/sources/test_sources.py +++ b/fennel/sources/test_sources.py @@ -235,9 +235,7 @@ def test_multiple_sources(): ), every="1h", lateness="2d", - starting_from=datetime.strptime( - "2021-08-10T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ" - ), + since=datetime.strptime("2021-08-10T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"), ) @dataset class UserInfoDatasetS3: