Skip to content

Commit

Permalink
DEV-3403 Introduce until to the client (#394)
Browse files Browse the repository at this point in the history
* Add test_lib to gitignore since gen_rust_lib changes on int test building
  • Loading branch information
zsid60 authored Mar 1, 2024
1 parent 5ff6599 commit c81f254
Show file tree
Hide file tree
Showing 13 changed files with 387 additions and 288 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
**/.DS_Store
fennel/proto/*.proto
fennel/test_lib/
output.txt

# Byte-compiled / optimized / DLL files
Expand Down
1 change: 0 additions & 1 deletion docs/examples/api-reference/rest-api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@


class TestRestAPI(unittest.TestCase):

@patch("requests.post")
def test_log(self, mock_post):
mock_post.return_value.status_code = 200
Expand Down
1 change: 1 addition & 0 deletions docs/examples/api-reference/sources/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def test_source_decorator(client):
cdc="append",
disorder="2d",
since=datetime(2021, 1, 1, 3, 30, 0), # 3:30 AM on 1st Jan 2021
until=datetime(2022, 1, 1, 0, 0, 0), # 12:00 AM on 1st Jan 2022
preproc={
"uid": ref("user_id"), # 'uid' comes from column 'user_id'
"country": "USA", # country for every row should become 'USA'
Expand Down
10 changes: 10 additions & 0 deletions docs/pages/api-reference/sources/source.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ several cases, it's still necessary to read all the data before rejecting rows
that are older than `since`.
</Expandable>

<Expandable title="until" type="Optional[datetime]" defaultVal="None">
When `until` is set, the source only admits those rows that where the value
corresponding to the `timestamp` column of the dataset will be < `until`.

Fennel reads as little data as possible given this constraint - for instance, when
reading parquet files, the filter is pushed all the way down. However, in
several cases, it's still necessary to read all the data before rejecting rows
that are newer than `until`.
</Expandable>

<Expandable title="disorder" type="Duration">
Specifies how out of order can data from this source arrive.

Expand Down
7 changes: 7 additions & 0 deletions docs/pages/concepts/source.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ ingest data from a specific time onwards. The `since` field allows us to do that

The `since` field is a `datetime` instance.

### Until
The `until` field provides a way to limit ingestion of a particular source to end at a particular timestamp.

When deriving datasets that are unions of batch and realtime sources, `until` and `since` can be used to control when
to switch between sources. For example, read from s3 `until` a timestamp, and from that timestamp onwards, read from kafka
using `since`.


## Load Impact of Sources

Expand Down
16 changes: 8 additions & 8 deletions fennel/gen/connector_pb2.py

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions fennel/gen/connector_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ class Source(google.protobuf.message.Message):
VERSION_FIELD_NUMBER: builtins.int
BOUNDED_FIELD_NUMBER: builtins.int
IDLENESS_FIELD_NUMBER: builtins.int
UNTIL_FIELD_NUMBER: builtins.int
@property
def table(self) -> global___ExtTable: ...
dataset: builtins.str
Expand All @@ -784,6 +785,8 @@ class Source(google.protobuf.message.Message):
bounded: builtins.bool
@property
def idleness(self) -> google.protobuf.duration_pb2.Duration: ...
@property
def until(self) -> google.protobuf.timestamp_pb2.Timestamp: ...
def __init__(
self,
*,
Expand All @@ -800,9 +803,10 @@ class Source(google.protobuf.message.Message):
version: builtins.int = ...,
bounded: builtins.bool = ...,
idleness: google.protobuf.duration_pb2.Duration | None = ...,
until: google.protobuf.timestamp_pb2.Timestamp | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["_cursor", b"_cursor", "_idleness", b"_idleness", "cursor", b"cursor", "disorder", b"disorder", "every", b"every", "idleness", b"idleness", "starting_from", b"starting_from", "table", b"table"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["_cursor", b"_cursor", "_idleness", b"_idleness", "bounded", b"bounded", "cdc", b"cdc", "cursor", b"cursor", "dataset", b"dataset", "disorder", b"disorder", "ds_version", b"ds_version", "every", b"every", "idleness", b"idleness", "pre_proc", b"pre_proc", "starting_from", b"starting_from", "table", b"table", "timestamp_field", b"timestamp_field", "version", b"version"]) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["_cursor", b"_cursor", "_idleness", b"_idleness", "cursor", b"cursor", "disorder", b"disorder", "every", b"every", "idleness", b"idleness", "starting_from", b"starting_from", "table", b"table", "until", b"until"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["_cursor", b"_cursor", "_idleness", b"_idleness", "bounded", b"bounded", "cdc", b"cdc", "cursor", b"cursor", "dataset", b"dataset", "disorder", b"disorder", "ds_version", b"ds_version", "every", b"every", "idleness", b"idleness", "pre_proc", b"pre_proc", "starting_from", b"starting_from", "table", b"table", "timestamp_field", b"timestamp_field", "until", b"until", "version", b"version"]) -> None: ...
@typing.overload
def WhichOneof(self, oneof_group: typing_extensions.Literal["_cursor", b"_cursor"]) -> typing_extensions.Literal["cursor"] | None: ...
@typing.overload
Expand Down
29 changes: 18 additions & 11 deletions fennel/internal_lib/to_proto/to_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,8 @@ def _kafka_conn_to_source_proto(
ds_version=ds_version,
cdc=to_cdc_proto(connector.cdc),
pre_proc=_pre_proc_to_proto(connector.pre_proc),
starting_from=_to_starting_from_proto(connector.since),
starting_from=_to_timestamp_proto(connector.since),
until=_to_timestamp_proto(connector.until),
)
return (ext_db, source)

Expand Down Expand Up @@ -739,7 +740,8 @@ def _s3_conn_to_source_proto(
ds_version=ds_version,
every=to_duration_proto(connector.every),
disorder=to_duration_proto(connector.disorder),
starting_from=_to_starting_from_proto(connector.since),
starting_from=_to_timestamp_proto(connector.since),
until=_to_timestamp_proto(connector.until),
cursor=None,
timestamp_field=timestamp_field,
cdc=to_cdc_proto(connector.cdc),
Expand Down Expand Up @@ -844,7 +846,8 @@ def _bigquery_conn_to_source_proto(
cursor=connector.cursor,
every=to_duration_proto(connector.every),
disorder=to_duration_proto(connector.disorder),
starting_from=_to_starting_from_proto(connector.since),
starting_from=_to_timestamp_proto(connector.since),
until=_to_timestamp_proto(connector.until),
timestamp_field=timestamp_field,
cdc=to_cdc_proto(connector.cdc),
pre_proc=_pre_proc_to_proto(connector.pre_proc),
Expand Down Expand Up @@ -911,7 +914,8 @@ def _snowflake_conn_to_source_proto(
timestamp_field=timestamp_field,
cdc=to_cdc_proto(connector.cdc),
pre_proc=_pre_proc_to_proto(connector.pre_proc),
starting_from=_to_starting_from_proto(connector.since),
starting_from=_to_timestamp_proto(connector.since),
until=_to_timestamp_proto(connector.until),
),
)

Expand Down Expand Up @@ -984,7 +988,8 @@ def _mysql_conn_to_source_proto(
disorder=to_duration_proto(connector.disorder),
timestamp_field=timestamp_field,
cdc=to_cdc_proto(connector.cdc),
starting_from=_to_starting_from_proto(connector.since),
starting_from=_to_timestamp_proto(connector.since),
until=_to_timestamp_proto(connector.until),
pre_proc=_pre_proc_to_proto(connector.pre_proc),
),
)
Expand Down Expand Up @@ -1067,7 +1072,8 @@ def _pg_conn_to_source_proto(
disorder=to_duration_proto(connector.disorder),
timestamp_field=timestamp_field,
cdc=to_cdc_proto(connector.cdc),
starting_from=_to_starting_from_proto(connector.since),
starting_from=_to_timestamp_proto(connector.since),
until=_to_timestamp_proto(connector.until),
pre_proc=_pre_proc_to_proto(connector.pre_proc),
),
)
Expand Down Expand Up @@ -1145,7 +1151,8 @@ def _kinesis_conn_to_source_proto(
ds_version=ds_version,
disorder=to_duration_proto(connector.disorder),
cdc=to_cdc_proto(connector.cdc),
starting_from=_to_starting_from_proto(connector.since),
starting_from=_to_timestamp_proto(connector.since),
until=_to_timestamp_proto(connector.until),
pre_proc=_pre_proc_to_proto(connector.pre_proc),
),
)
Expand Down Expand Up @@ -1361,13 +1368,13 @@ def {featureset._name}_{extractor.name}(*args, **kwargs):


# ------------------------------------------------------------------------------
# Since
# Since / Until
# ------------------------------------------------------------------------------


def _to_starting_from_proto(since: Optional[datetime]) -> Optional[Timestamp]:
if since is None:
def _to_timestamp_proto(dt: Optional[datetime]) -> Optional[Timestamp]:
if dt is None:
return None
ts = Timestamp()
ts.FromDatetime(since)
ts.FromDatetime(dt)
return ts
11 changes: 11 additions & 0 deletions fennel/sources/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def source(
cdc: str,
every: Optional[Duration] = None,
since: Optional[datetime] = None,
until: Optional[datetime] = None,
tier: Optional[Union[str, List[str]]] = None,
preproc: Optional[Dict[str, PreProcValue]] = None,
) -> Callable[[T], Any]:
Expand All @@ -56,9 +57,18 @@ def source(
if since is not None and not isinstance(since, datetime):
raise TypeError(f"'since' must be of type datetime - got {type(since)}")

if until is not None and not isinstance(until, datetime):
raise TypeError(f"'until' must be of type datetime - got {type(until)}")

if since is not None and until is not None and since > until:
raise ValueError(
f"'since' ({since}) must be earlier than 'until' ({until})"
)

def decorator(dataset_cls: T):
conn.every = every if every is not None else DEFAULT_EVERY
conn.since = since
conn.until = until
conn.disorder = disorder
conn.cdc = cdc
conn.tiers = TierSelector(tier)
Expand Down Expand Up @@ -377,6 +387,7 @@ class DataConnector:
disorder: Duration
cdc: str
since: Optional[datetime] = None
until: Optional[datetime] = None
tiers: TierSelector
pre_proc: Optional[Dict[str, PreProcValue]] = None

Expand Down
53 changes: 53 additions & 0 deletions fennel/sources/test_invalid_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,59 @@ class UserInfoDataset:
)


def test_invalid_until():
with pytest.raises(Exception) as e:

@source(
s3.bucket(bucket_name="bucket", prefix="prefix"),
every="1h",
disorder="14d",
cdc="append",
until="2020-01-01T00:00:00Z",
)
@meta(owner="[email protected]")
@dataset
class UserInfoDataset:
user_id: int = field(key=True)
name: str
gender: str
# Users date of birth
dob: str
age: int
account_creation_date: datetime
country: Optional[str]
timestamp: datetime = field(timestamp=True)

assert "'until' must be of type datetime - got <class 'str'>" == str(
e.value
)

with pytest.raises(ValueError) as e:

@source(
s3.bucket(bucket_name="bucket", prefix="prefix"),
every="1h",
disorder="14d",
cdc="append",
since=datetime(2024, 1, 1),
until=datetime(2023, 12, 1),
)
@meta(owner="[email protected]")
@dataset
class UserInfoDataset2:
user_id: int = field(key=True)
name: str
gender: str
# Users date of birth
dob: str
age: int
account_creation_date: datetime
country: Optional[str]
timestamp: datetime = field(timestamp=True)

assert ("must be earlier than 'until'") in str(e.value)


def test_invalid_s3_format():
with pytest.raises(Exception) as e:
s3.bucket(bucket_name="bucket", prefix="prefix", format="py")
Expand Down
6 changes: 4 additions & 2 deletions fennel/sources/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ def test_multiple_sources():
disorder="2d",
cdc="append",
since=datetime.strptime("2021-08-10T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"),
until=datetime.strptime("2022-02-28T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"),
)
@dataset
class UserInfoDatasetS3:
Expand Down Expand Up @@ -572,6 +573,7 @@ class UserInfoDatasetS3:
"every": "3600s",
"disorder": "172800s",
"startingFrom": "2021-08-10T00:00:00Z",
"until": "2022-02-28T00:00:00Z",
"timestamp_field": "timestamp",
}
expected_source_request = ParseDict(s, connector_proto.Source())
Expand Down Expand Up @@ -668,7 +670,7 @@ class UserInfoDatasetSnowFlake:
every="1h",
disorder="14d",
cdc="append",
since=datetime.strptime("2021-08-10T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"),
until=datetime.strptime("2021-08-10T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"),
)
@dataset
class UserInfoDatasetSnowFlakeStartingFrom:
Expand Down Expand Up @@ -711,7 +713,7 @@ class UserInfoDatasetSnowFlakeStartingFrom:
"disorder": "1209600s",
"cursor": "added_on",
"timestampField": "timestamp",
"startingFrom": "2021-08-10T00:00:00Z",
"until": "2021-08-10T00:00:00Z",
}
expected_source_request = ParseDict(s, connector_proto.Source())
assert source_request == expected_source_request, error_message(
Expand Down
Loading

0 comments on commit c81f254

Please sign in to comment.