diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md index 703820a8..2a1e346d 100644 --- a/fennel/CHANGELOG.md +++ b/fennel/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## [1.5.31] - 2024-09-26 +- Fix bug in signature of assign operator. + ## [1.5.30] - 2024-09-24 - Add hour, minute, second, millisecond, microsecond, day, week accessors as properties instead of methods. diff --git a/fennel/datasets/datasets.py b/fennel/datasets/datasets.py index 54b9db3c..0224bc5d 100644 --- a/fennel/datasets/datasets.py +++ b/fennel/datasets/datasets.py @@ -521,27 +521,21 @@ def from_expressions(cls, self, **kwargs): return Assign(self, None, None, None, **kwargs) def signature(self): - if isinstance(self.node, Dataset): - return fhash( - self.node._name, - self.func, - self.column, - ( - self.output_type.__name__ - if self.output_type is not None - else None - ), - ) + item = ( + self.node._name + if isinstance(self.node, Dataset) + else self.node.signature() + ) if self.assign_type == UDFType.python: return fhash( - self.node.signature(), + item, self.func, self.column, self.output_type.__name__, ) else: return fhash( - self.node.signature(), + item, self.output_expressions, ) diff --git a/fennel/datasets/test_dataset.py b/fennel/datasets/test_dataset.py index 9fc4a8a8..4fbdf328 100644 --- a/fennel/datasets/test_dataset.py +++ b/fennel/datasets/test_dataset.py @@ -22,6 +22,7 @@ FirstK, ) from fennel.dtypes import Embedding, Window, Continuous, Session +from fennel.expr import lit from fennel.gen.services_pb2 import SyncRequest from fennel.lib import includes, meta, inputs, desc from fennel.testing import * @@ -3523,3 +3524,180 @@ def from_a_to_d(cls, a: Dataset): assert operators[4] == expected_operator_request, error_message( operators[4], expected_operator_request ) + + +def test_complex_union(): + @source( + webhook.endpoint("Views"), + cdc="append", + disorder="14d", + ) + @dataset + class Views: + user_id: int + event_id: int + t: datetime + + @source( + webhook.endpoint("Views"), + cdc="append", + disorder="14d", + ) + @dataset + class Clicks: + user_id: int + event_id: int + t: datetime + + @dataset + class Events: + user_id: int + event_id: int + event_type: str + t: datetime + + @pipeline + @inputs(Views, Clicks) + def my_pipeline(cls, views: Dataset, clicks: Dataset): + a = views.assign(event_type=lit("view_1").astype(str)) + b = views.assign(event_type=lit("view_2").astype(str)) + c = clicks.assign(event_type=lit("clicks").astype(str)) + return a + b + c + + view = InternalTestClient() + view.add(Views) + view.add(Clicks) + view.add(Events) + sync_request = view._get_sync_request_proto() + assert len(sync_request.datasets) == 3 + assert len(sync_request.pipelines) == 1 + assert len(sync_request.operators) == 7 + + operators = sync_request.operators + ds_ref = { + "id": "Views", + "pipelineName": "my_pipeline", + "datasetName": "Events", + "datasetRef": {"referringDatasetName": "Views"}, + "dsVersion": 1, + } + expected_operator_request = ParseDict(ds_ref, ds_proto.Operator()) + assert operators[0] == expected_operator_request, error_message( + operators[0], expected_operator_request + ) + assign = { + "id": "2136094350b1271a0502789f7759d022", + "pipelineName": "my_pipeline", + "datasetName": "Events", + "assignExpr": { + "operandId": "Views", + "outputTypes": { + "event_type": {"stringType": {}}, + }, + "exprs": { + "event_type": { + "jsonLiteral": { + "dtype": {"stringType": {}}, + "literal": '"view_1"', + } + } + }, + }, + "dsVersion": 1, + } + expected_operator_request = ParseDict(assign, ds_proto.Operator()) + assert operators[1] == expected_operator_request, error_message( + operators[1], expected_operator_request + ) + assign = { + "id": "3ef5d23a31640e82164d5e51603d0885", + "pipelineName": "my_pipeline", + "datasetName": "Events", + "assignExpr": { + "operandId": "Views", + "outputTypes": { + "event_type": {"stringType": {}}, + }, + "exprs": { + "event_type": { + "jsonLiteral": { + "dtype": {"stringType": {}}, + "literal": '"view_2"', + } + } + }, + }, + "dsVersion": 1, + } + expected_operator_request = ParseDict(assign, ds_proto.Operator()) + assert operators[2] == expected_operator_request, error_message( + operators[2], expected_operator_request + ) + union = { + "id": "d245616918044075e99a6f19d40b0b42", + "pipelineName": "my_pipeline", + "datasetName": "Events", + "union": { + "operandIds": [ + "2136094350b1271a0502789f7759d022", + "3ef5d23a31640e82164d5e51603d0885", + ], + }, + "dsVersion": 1, + } + expected_operator_request = ParseDict(union, ds_proto.Operator()) + assert operators[3] == expected_operator_request, error_message( + operators[3], expected_operator_request + ) + ds_ref = { + "id": "Clicks", + "pipelineName": "my_pipeline", + "datasetName": "Events", + "datasetRef": {"referringDatasetName": "Clicks"}, + "dsVersion": 1, + } + expected_operator_request = ParseDict(ds_ref, ds_proto.Operator()) + assert operators[4] == expected_operator_request, error_message( + operators[4], expected_operator_request + ) + assign = { + "id": "f2701fd78f5dc1494e519aba5d8fc934", + "pipelineName": "my_pipeline", + "datasetName": "Events", + "assignExpr": { + "operandId": "Clicks", + "outputTypes": { + "event_type": {"stringType": {}}, + }, + "exprs": { + "event_type": { + "jsonLiteral": { + "dtype": {"stringType": {}}, + "literal": '"clicks"', + } + } + }, + }, + "dsVersion": 1, + } + expected_operator_request = ParseDict(assign, ds_proto.Operator()) + assert operators[5] == expected_operator_request, error_message( + operators[5], expected_operator_request + ) + union = { + "id": "fa08df185243417fc9c8e7fb819a4753", + "isRoot": True, + "pipelineName": "my_pipeline", + "datasetName": "Events", + "union": { + "operandIds": [ + "d245616918044075e99a6f19d40b0b42", + "f2701fd78f5dc1494e519aba5d8fc934", + ], + }, + "dsVersion": 1, + } + expected_operator_request = ParseDict(union, ds_proto.Operator()) + assert operators[6] == expected_operator_request, error_message( + operators[6], expected_operator_request + ) diff --git a/pyproject.toml b/pyproject.toml index ecef1497..122c864f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fennel-ai" -version = "1.5.30" +version = "1.5.31" description = "The modern realtime feature engineering platform" authors = ["Fennel AI "] packages = [{ include = "fennel" }]