Skip to content

Commit

Permalink
source: Support since param in sources (#286)
Browse files Browse the repository at this point in the history
* source: Support starting_from param in sources

* Rename starting_from to sinc
  • Loading branch information
aditya-nambiar authored Nov 7, 2023
1 parent 6da236e commit 832b194
Show file tree
Hide file tree
Showing 23 changed files with 141 additions and 10 deletions.
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [0.18.10] - 2023-10-30
- Add support for `since` in S3 source.

## [0.18.9]- 2023-10-27
- Added support for datetime fields in struct types

Expand Down
3 changes: 2 additions & 1 deletion fennel/datasets/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,7 @@ def with_source(
self,
conn: DataConnector,
every: Optional[Duration] = None,
starting_from: Optional[datetime.datetime] = None,
lateness: Optional[Duration] = None,
):
if len(self._pipelines) > 0:
Expand All @@ -1177,7 +1178,7 @@ def with_source(
ds_copy = copy.deepcopy(self)
if hasattr(ds_copy, sources.SOURCE_FIELD):
delattr(ds_copy, sources.SOURCE_FIELD)
src_fn = source(conn, every, lateness)
src_fn = source(conn, every, starting_from, lateness)
return src_fn(ds_copy)

def dsschema(self):
Expand Down
1 change: 1 addition & 0 deletions fennel/gen/auth_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 7 additions & 6 deletions fennel/gen/connector_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions fennel/gen/connector_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ class _CDCStrategyEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._En
Upsert: _CDCStrategy.ValueType # 1
Debezium: _CDCStrategy.ValueType # 2
Native: _CDCStrategy.ValueType # 3
Delete: _CDCStrategy.ValueType # 4

class CDCStrategy(_CDCStrategy, metaclass=_CDCStrategyEnumTypeWrapper): ...

Append: CDCStrategy.ValueType # 0
Upsert: CDCStrategy.ValueType # 1
Debezium: CDCStrategy.ValueType # 2
Native: CDCStrategy.ValueType # 3
Delete: CDCStrategy.ValueType # 4
global___CDCStrategy = CDCStrategy

@typing_extensions.final
Expand Down Expand Up @@ -655,6 +657,7 @@ class Source(google.protobuf.message.Message):
LATENESS_FIELD_NUMBER: builtins.int
TIMESTAMP_FIELD_FIELD_NUMBER: builtins.int
CDC_FIELD_NUMBER: builtins.int
STARTING_FROM_FIELD_NUMBER: builtins.int
@property
def table(self) -> global___ExtTable: ...
dataset: builtins.str
Expand All @@ -665,6 +668,8 @@ class Source(google.protobuf.message.Message):
def lateness(self) -> google.protobuf.duration_pb2.Duration: ...
timestamp_field: builtins.str
cdc: global___CDCStrategy.ValueType
@property
def starting_from(self) -> google.protobuf.timestamp_pb2.Timestamp: ...
def __init__(
self,
*,
Expand All @@ -675,9 +680,10 @@ class Source(google.protobuf.message.Message):
lateness: google.protobuf.duration_pb2.Duration | None = ...,
timestamp_field: builtins.str = ...,
cdc: global___CDCStrategy.ValueType = ...,
starting_from: google.protobuf.timestamp_pb2.Timestamp | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["_cursor", b"_cursor", "cursor", b"cursor", "every", b"every", "lateness", b"lateness", "table", b"table"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["_cursor", b"_cursor", "cdc", b"cdc", "cursor", b"cursor", "dataset", b"dataset", "every", b"every", "lateness", b"lateness", "table", b"table", "timestamp_field", b"timestamp_field"]) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["_cursor", b"_cursor", "cursor", b"cursor", "every", b"every", "lateness", b"lateness", "starting_from", b"starting_from", "table", b"table"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["_cursor", b"_cursor", "cdc", b"cdc", "cursor", b"cursor", "dataset", b"dataset", "every", b"every", "lateness", b"lateness", "starting_from", b"starting_from", "table", b"table", "timestamp_field", b"timestamp_field"]) -> None: ...
def WhichOneof(self, oneof_group: typing_extensions.Literal["_cursor", b"_cursor"]) -> typing_extensions.Literal["cursor"] | None: ...

global___Source = Source
Expand Down
1 change: 1 addition & 0 deletions fennel/gen/dataset_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions fennel/gen/expectations_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions fennel/gen/featureset_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions fennel/gen/format_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions fennel/gen/http_auth_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions fennel/gen/kinesis_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 832b194

Please sign in to comment.