Skip to content

Commit

Permalink
Fix bug in signature of assign operator.
Browse files Browse the repository at this point in the history
  • Loading branch information
nonibansal authored and nikhilgarg28 committed Sep 26, 2024
1 parent 7ed5ff6 commit 15ec345
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 14 deletions.
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [1.5.31] - 2024-09-26
- Fix bug in signature of assign operator.

## [1.5.30] - 2024-09-24
- Add hour, minute, second, millisecond, microsecond, day, week accessors as properties instead of methods.

Expand Down
20 changes: 7 additions & 13 deletions fennel/datasets/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,27 +521,21 @@ def from_expressions(cls, self, **kwargs):
return Assign(self, None, None, None, **kwargs)

def signature(self):
if isinstance(self.node, Dataset):
return fhash(
self.node._name,
self.func,
self.column,
(
self.output_type.__name__
if self.output_type is not None
else None
),
)
item = (
self.node._name
if isinstance(self.node, Dataset)
else self.node.signature()
)
if self.assign_type == UDFType.python:
return fhash(
self.node.signature(),
item,
self.func,
self.column,
self.output_type.__name__,
)
else:
return fhash(
self.node.signature(),
item,
self.output_expressions,
)

Expand Down
178 changes: 178 additions & 0 deletions fennel/datasets/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
FirstK,
)
from fennel.dtypes import Embedding, Window, Continuous, Session
from fennel.expr import lit
from fennel.gen.services_pb2 import SyncRequest
from fennel.lib import includes, meta, inputs, desc
from fennel.testing import *
Expand Down Expand Up @@ -3523,3 +3524,180 @@ def from_a_to_d(cls, a: Dataset):
assert operators[4] == expected_operator_request, error_message(
operators[4], expected_operator_request
)


def test_complex_union():
@source(
webhook.endpoint("Views"),
cdc="append",
disorder="14d",
)
@dataset
class Views:
user_id: int
event_id: int
t: datetime

@source(
webhook.endpoint("Views"),
cdc="append",
disorder="14d",
)
@dataset
class Clicks:
user_id: int
event_id: int
t: datetime

@dataset
class Events:
user_id: int
event_id: int
event_type: str
t: datetime

@pipeline
@inputs(Views, Clicks)
def my_pipeline(cls, views: Dataset, clicks: Dataset):
a = views.assign(event_type=lit("view_1").astype(str))
b = views.assign(event_type=lit("view_2").astype(str))
c = clicks.assign(event_type=lit("clicks").astype(str))
return a + b + c

view = InternalTestClient()
view.add(Views)
view.add(Clicks)
view.add(Events)
sync_request = view._get_sync_request_proto()
assert len(sync_request.datasets) == 3
assert len(sync_request.pipelines) == 1
assert len(sync_request.operators) == 7

operators = sync_request.operators
ds_ref = {
"id": "Views",
"pipelineName": "my_pipeline",
"datasetName": "Events",
"datasetRef": {"referringDatasetName": "Views"},
"dsVersion": 1,
}
expected_operator_request = ParseDict(ds_ref, ds_proto.Operator())
assert operators[0] == expected_operator_request, error_message(
operators[0], expected_operator_request
)
assign = {
"id": "2136094350b1271a0502789f7759d022",
"pipelineName": "my_pipeline",
"datasetName": "Events",
"assignExpr": {
"operandId": "Views",
"outputTypes": {
"event_type": {"stringType": {}},
},
"exprs": {
"event_type": {
"jsonLiteral": {
"dtype": {"stringType": {}},
"literal": '"view_1"',
}
}
},
},
"dsVersion": 1,
}
expected_operator_request = ParseDict(assign, ds_proto.Operator())
assert operators[1] == expected_operator_request, error_message(
operators[1], expected_operator_request
)
assign = {
"id": "3ef5d23a31640e82164d5e51603d0885",
"pipelineName": "my_pipeline",
"datasetName": "Events",
"assignExpr": {
"operandId": "Views",
"outputTypes": {
"event_type": {"stringType": {}},
},
"exprs": {
"event_type": {
"jsonLiteral": {
"dtype": {"stringType": {}},
"literal": '"view_2"',
}
}
},
},
"dsVersion": 1,
}
expected_operator_request = ParseDict(assign, ds_proto.Operator())
assert operators[2] == expected_operator_request, error_message(
operators[2], expected_operator_request
)
union = {
"id": "d245616918044075e99a6f19d40b0b42",
"pipelineName": "my_pipeline",
"datasetName": "Events",
"union": {
"operandIds": [
"2136094350b1271a0502789f7759d022",
"3ef5d23a31640e82164d5e51603d0885",
],
},
"dsVersion": 1,
}
expected_operator_request = ParseDict(union, ds_proto.Operator())
assert operators[3] == expected_operator_request, error_message(
operators[3], expected_operator_request
)
ds_ref = {
"id": "Clicks",
"pipelineName": "my_pipeline",
"datasetName": "Events",
"datasetRef": {"referringDatasetName": "Clicks"},
"dsVersion": 1,
}
expected_operator_request = ParseDict(ds_ref, ds_proto.Operator())
assert operators[4] == expected_operator_request, error_message(
operators[4], expected_operator_request
)
assign = {
"id": "f2701fd78f5dc1494e519aba5d8fc934",
"pipelineName": "my_pipeline",
"datasetName": "Events",
"assignExpr": {
"operandId": "Clicks",
"outputTypes": {
"event_type": {"stringType": {}},
},
"exprs": {
"event_type": {
"jsonLiteral": {
"dtype": {"stringType": {}},
"literal": '"clicks"',
}
}
},
},
"dsVersion": 1,
}
expected_operator_request = ParseDict(assign, ds_proto.Operator())
assert operators[5] == expected_operator_request, error_message(
operators[5], expected_operator_request
)
union = {
"id": "fa08df185243417fc9c8e7fb819a4753",
"isRoot": True,
"pipelineName": "my_pipeline",
"datasetName": "Events",
"union": {
"operandIds": [
"d245616918044075e99a6f19d40b0b42",
"f2701fd78f5dc1494e519aba5d8fc934",
],
},
"dsVersion": 1,
}
expected_operator_request = ParseDict(union, ds_proto.Operator())
assert operators[6] == expected_operator_request, error_message(
operators[6], expected_operator_request
)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "fennel-ai"
version = "1.5.30"
version = "1.5.31"
description = "The modern realtime feature engineering platform"
authors = ["Fennel AI <[email protected]>"]
packages = [{ include = "fennel" }]
Expand Down

0 comments on commit 15ec345

Please sign in to comment.