Skip to content

Commit

Permalink
Add changelog operator (#578)
Browse files Browse the repository at this point in the history
In this change we add changelog operator which converts a deltaframe into an append only
changelog, with keys and kinds stored in value columns.
  • Loading branch information
satrana42 authored Oct 15, 2024
1 parent b9fa39b commit 44456e0
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 94 deletions.
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [1.5.40] - 2024-10-05
- Add changelog operator

## [1.5.39] - 2024-10-14
- Add support for Snowflake sink

Expand Down
121 changes: 102 additions & 19 deletions fennel/client_tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4619,9 +4619,11 @@ def pipeline(cls, event: Dataset):
assert results["value"].tolist() == [[1], [2], [3]]


@pytest.mark.integration
@mock
def test_unkey_operator(client):
def test_changelog_operator(client):
@dataset
@source(webhook.endpoint("JobOpening"), disorder="14d", cdc="append")
class JobOpening:
creater_id: int
job_id: int
Expand All @@ -4630,25 +4632,23 @@ class JobOpening:
@dataset(index=True)
class JobRank:
creater_id: int = field(key=True)
rank: int = field(key=True)
job_id: int = field(key=True)
is_delete: bool = field(key=True)
creation_ts: datetime

@pipeline
@inputs(JobOpening)
def rank_job(cls, job_opening: Dataset):
return (
job_opening.groupby("creater_id")
.aggregate(
rank=Count(window=Continuous("forever")), emit="final"
)
.changelog("del")
.filter(lambda df: ~df["del"] & (df["rank"] < 4))
.drop("del")
.groupby("creater_id", "rank")
.latest()
.changelog(delete="is_delete")
.groupby("creater_id", "job_id", "is_delete")
.latest()
)

client.commit(datasets=[JobOpening, JobRank], message="test")

# Creation ts is 1 every 2 hours
creation_ts = [
datetime(2022, 1, 1, 2, 0, 0),
Expand All @@ -4658,30 +4658,113 @@ def rank_job(cls, job_opening: Dataset):
datetime(2022, 1, 1, 10, 0, 0),
datetime(2022, 1, 1, 12, 0, 0),
]

openings = pd.DataFrame(
{
"creater_id": [1, 2, 1, 1, 1, 1],
"job_id": [1, 2, 3, 4, 5, 6],
"creation_ts": creation_ts,
}
)
log(JobOpening, openings)
client.log("fennel_webhook", "JobOpening", openings)

if client.is_integration_client():
client.sleep(60)

is_deletes = [True, False, True, True, True, False]

results, found = client.lookup(
"JobRank",
JobRank,
keys=pd.DataFrame(
{"creater_id": [1, 1, 1, 1, 2], "rank": [4, 3, 2, 1, 1]}
{
"creater_id": [1, 2, 1, 1, 1, 1],
"job_id": [1, 2, 3, 4, 5, 6],
"is_delete": is_deletes,
}
),
)
assert found.tolist() == [False, True, True, True, True]
assert results.shape == (5, 3)
assert results["rank"].tolist() == [4, 3, 2, 1, 1]
assert results["creation_ts"].tolist() == [
pd.NaT,
pd.Timestamp("2022-01-01 08:00:00", tz="UTC"),

creation_ts_expected = [
pd.Timestamp("2022-01-01 06:00:00", tz="UTC"),
pd.Timestamp("2022-01-01 02:00:00", tz="UTC"),
pd.Timestamp("2022-01-01 04:00:00", tz="UTC"),
pd.Timestamp("2022-01-01 08:00:00", tz="UTC"),
pd.Timestamp("2022-01-01 10:00:00", tz="UTC"),
pd.Timestamp("2022-01-01 12:00:00", tz="UTC"),
pd.Timestamp("2022-01-01 12:00:00", tz="UTC"),
]
assert found.tolist() == [True, True, True, True, True, True]
assert results.shape == (6, 4)
assert results["creation_ts"].tolist() == creation_ts_expected


@pytest.mark.integration
@mock
def test_changelog_operator_insert_identity(client):
@dataset
@source(webhook.endpoint("JobOpening"), disorder="14d", cdc="append")
class JobOpening:
creater_id: int
job_id: int
creation_ts: datetime

@dataset(index=True)
class JobRank:
creater_id: int = field(key=True)
job_id: int = field(key=True)
creation_ts: datetime

@pipeline
@inputs(JobOpening)
def rank_job(cls, job_opening: Dataset):
return (
job_opening.groupby("creater_id")
.latest()
.changelog(insert="is_insert")
.filter(lambda df: df["is_insert"])
.drop("is_insert")
.groupby("creater_id", "job_id")
.latest()
)

client.commit(datasets=[JobOpening, JobRank], message="test")

# Creation ts is 1 every 2 hours
creation_ts = [
datetime(2022, 1, 1, 2, 0, 0),
datetime(2022, 1, 1, 4, 0, 0),
datetime(2022, 1, 1, 6, 0, 0),
datetime(2022, 1, 1, 8, 0, 0),
datetime(2022, 1, 1, 10, 0, 0),
datetime(2022, 1, 1, 12, 0, 0),
]

openings = pd.DataFrame(
{
"creater_id": [1, 2, 1, 1, 1, 1],
"job_id": [1, 2, 3, 4, 5, 6],
"creation_ts": creation_ts,
}
)
client.log("fennel_webhook", "JobOpening", openings)

if client.is_integration_client():
client.sleep(60)

results, found = client.lookup(
JobRank,
keys=pd.DataFrame(
{
"creater_id": [1, 2, 1, 1, 1, 1],
"job_id": [1, 2, 3, 4, 5, 6],
}
),
)

creation_ts_expected = [pd.Timestamp(d, tz="UTC") for d in creation_ts]

assert found.tolist() == [True, True, True, True, True, True]
assert results.shape == (6, 3)
assert results["creation_ts"].tolist() == creation_ts_expected


webhook = Webhook(name="fennel_webhook")
Expand Down
32 changes: 24 additions & 8 deletions fennel/datasets/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,8 @@ def explode(self, *args, columns: List[str] = None) -> _Node:
columns = _Node.__get_list_args(*args, columns=columns, name="explode")
return Explode(self, columns)

def changelog(self, delete_column: str) -> _Node:
return Changelog(self, delete_column)
def changelog(self, **kwargs) -> _Node:
return Changelog(self, **kwargs)

def isignature(self):
raise NotImplementedError
Expand Down Expand Up @@ -579,22 +579,38 @@ def dsschema(self):


class Changelog(_Node):
def __init__(self, node: _Node, delete_column: str):
def __init__(self, node: _Node, **kwargs):
super().__init__()
self.node = node
self.delete_column = delete_column

delete_column = kwargs.get("delete")
insert_column = kwargs.get("insert")
if delete_column is None and insert_column is None:
raise ValueError("Either delete or insert column must be specified")
elif delete_column is not None and insert_column is not None:
raise ValueError(
"Only one of delete or insert column can be specified"
)
elif delete_column is not None:
self.kind_column = delete_column
self.deletes = True
else:
assert insert_column is not None
self.kind_column = insert_column
self.deletes = False

self.node.out_edges.append(self)

def signature(self):
return fhash(self.node.signature(), self.delete_column)
return fhash(self.node.signature(), self.kind_column, self.deletes)

def dsschema(self):
input_schema = self.node.dsschema()
# Remove all keys from the schema and make then values
# Remove all keys from the schema and make them values
val_fields = copy.deepcopy(input_schema.keys)
values = input_schema.values
val_fields.update(values)
val_fields.update({self.delete_column: pd.BooleanDtype})
val_fields.update({self.kind_column: pd.BooleanDtype})
return DSSchema(
keys={},
values=val_fields,
Expand Down Expand Up @@ -3346,7 +3362,7 @@ def visitChangelog(self, obj) -> DSSchema:
# Unkey operation is allowed on keyed datasets only.
if len(input_schema.keys) == 0:
raise TypeError(
f"UnKey operation is allowed only on keyed datasets. Found dataset without keys in pipeline `{self.pipeline_name}`"
f"Changelog operation is allowed only on keyed datasets. Found dataset without keys in pipeline `{self.pipeline_name}`"
)
output_schema = copy.deepcopy(obj.dsschema())
output_schema.name = (
Expand Down
36 changes: 18 additions & 18 deletions fennel/gen/dataset_pb2.py

Large diffs are not rendered by default.

16 changes: 12 additions & 4 deletions fennel/gen/dataset_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -832,10 +832,17 @@ class Changelog(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

OPERAND_ID_FIELD_NUMBER: builtins.int
DELETE_COLUMN_FIELD_NUMBER: builtins.int
KIND_COLUMN_FIELD_NUMBER: builtins.int
DELETES_FIELD_NUMBER: builtins.int
OPERAND_NAME_FIELD_NUMBER: builtins.int
operand_id: builtins.str
delete_column: builtins.str
kind_column: builtins.str
"""Name of column which will store kind value as a boolean"""
deletes: builtins.bool
"""If deletes is true then the kind column will store true
for deletes and false for inserts. If deletes is false then the
kind column will store true for inserts and false for deletes.
"""
operand_name: builtins.str
"""NOTE: FOLLOWING PROPERTIES ARE SET BY THE SERVER AND WILL BE IGNORED BY
THE CLIENT
Expand All @@ -844,10 +851,11 @@ class Changelog(google.protobuf.message.Message):
self,
*,
operand_id: builtins.str = ...,
delete_column: builtins.str = ...,
kind_column: builtins.str = ...,
deletes: builtins.bool = ...,
operand_name: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["delete_column", b"delete_column", "operand_id", b"operand_id", "operand_name", b"operand_name"]) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["deletes", b"deletes", "kind_column", b"kind_column", "operand_id", b"operand_id", "operand_name", b"operand_name"]) -> None: ...

global___Changelog = Changelog

Expand Down
82 changes: 44 additions & 38 deletions fennel/gen/expression_pb2.py

Large diffs are not rendered by default.

58 changes: 55 additions & 3 deletions fennel/gen/expression_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,9 @@ class StringOp(google.protobuf.message.Message):
CONCAT_FIELD_NUMBER: builtins.int
STRPTIME_FIELD_NUMBER: builtins.int
JSON_DECODE_FIELD_NUMBER: builtins.int
SPLIT_FIELD_NUMBER: builtins.int
JSON_EXTRACT_FIELD_NUMBER: builtins.int
TO_INT_FIELD_NUMBER: builtins.int
@property
def len(self) -> global___Len: ...
@property
Expand All @@ -951,6 +954,12 @@ class StringOp(google.protobuf.message.Message):
def strptime(self) -> global___Strptime: ...
@property
def json_decode(self) -> global___JsonDecode: ...
@property
def split(self) -> global___Split: ...
@property
def json_extract(self) -> global___JsonExtract: ...
@property
def to_int(self) -> global___ToInt: ...
def __init__(
self,
*,
Expand All @@ -963,10 +972,13 @@ class StringOp(google.protobuf.message.Message):
concat: global___Concat | None = ...,
strptime: global___Strptime | None = ...,
json_decode: global___JsonDecode | None = ...,
split: global___Split | None = ...,
json_extract: global___JsonExtract | None = ...,
to_int: global___ToInt | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["concat", b"concat", "contains", b"contains", "endswith", b"endswith", "fn_type", b"fn_type", "json_decode", b"json_decode", "len", b"len", "startswith", b"startswith", "strptime", b"strptime", "tolower", b"tolower", "toupper", b"toupper"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["concat", b"concat", "contains", b"contains", "endswith", b"endswith", "fn_type", b"fn_type", "json_decode", b"json_decode", "len", b"len", "startswith", b"startswith", "strptime", b"strptime", "tolower", b"tolower", "toupper", b"toupper"]) -> None: ...
def WhichOneof(self, oneof_group: typing_extensions.Literal["fn_type", b"fn_type"]) -> typing_extensions.Literal["len", "tolower", "toupper", "contains", "startswith", "endswith", "concat", "strptime", "json_decode"] | None: ...
def HasField(self, field_name: typing_extensions.Literal["concat", b"concat", "contains", b"contains", "endswith", b"endswith", "fn_type", b"fn_type", "json_decode", b"json_decode", "json_extract", b"json_extract", "len", b"len", "split", b"split", "startswith", b"startswith", "strptime", b"strptime", "to_int", b"to_int", "tolower", b"tolower", "toupper", b"toupper"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["concat", b"concat", "contains", b"contains", "endswith", b"endswith", "fn_type", b"fn_type", "json_decode", b"json_decode", "json_extract", b"json_extract", "len", b"len", "split", b"split", "startswith", b"startswith", "strptime", b"strptime", "to_int", b"to_int", "tolower", b"tolower", "toupper", b"toupper"]) -> None: ...
def WhichOneof(self, oneof_group: typing_extensions.Literal["fn_type", b"fn_type"]) -> typing_extensions.Literal["len", "tolower", "toupper", "contains", "startswith", "endswith", "concat", "strptime", "json_decode", "split", "json_extract", "to_int"] | None: ...

global___StringOp = StringOp

Expand Down Expand Up @@ -1239,3 +1251,43 @@ class Part(google.protobuf.message.Message):
def ClearField(self, field_name: typing_extensions.Literal["timezone", b"timezone", "unit", b"unit"]) -> None: ...

global___Part = Part

@typing_extensions.final
class Split(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

SEP_FIELD_NUMBER: builtins.int
sep: builtins.str
def __init__(
self,
*,
sep: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["sep", b"sep"]) -> None: ...

global___Split = Split

@typing_extensions.final
class JsonExtract(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

PATH_FIELD_NUMBER: builtins.int
path: builtins.str
def __init__(
self,
*,
path: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["path", b"path"]) -> None: ...

global___JsonExtract = JsonExtract

@typing_extensions.final
class ToInt(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

def __init__(
self,
) -> None: ...

global___ToInt = ToInt
3 changes: 2 additions & 1 deletion fennel/internal_lib/to_proto/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ def visitChangelog(self, obj):
ds_version=self.dataset_version,
changelog=proto.Changelog(
operand_id=self.visit(obj.node),
delete_column=obj.delete_column,
kind_column=obj.kind_column,
deletes=obj.deletes,
),
)
4 changes: 2 additions & 2 deletions fennel/testing/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -954,8 +954,8 @@ def visitChangelog(self, obj):
delete_df[input_ret.timestamp_field] = delete_df[
FENNEL_DELETE_TIMESTAMP
]
df[obj.delete_column] = False
delete_df[obj.delete_column] = True
df[obj.kind_column] = False if obj.deletes else True
delete_df[obj.kind_column] = True if obj.deletes else False
df = pd.concat([df, delete_df])
df = df.drop(columns=[FENNEL_DELETE_TIMESTAMP])
# Sort the dataframe by timestamp
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "fennel-ai"
version = "1.5.39"
version = "1.5.40"
description = "The modern realtime feature engineering platform"
authors = ["Fennel AI <[email protected]>"]
packages = [{ include = "fennel" }]
Expand Down

0 comments on commit 44456e0

Please sign in to comment.