diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md index ffa25defb..d5cab1b77 100644 --- a/fennel/CHANGELOG.md +++ b/fennel/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## [1.6.0] - 2024-12-10 +- Allow None as default value for min/max/avg/stddev aggregations. + ## [1.5.58] - 2024-11-24 - Allow min/max aggregation on date, datetime and decimal dtypes diff --git a/fennel/client_tests/test_complex_aggregation.py b/fennel/client_tests/test_complex_aggregation.py index a7308e06c..69355aaa6 100644 --- a/fennel/client_tests/test_complex_aggregation.py +++ b/fennel/client_tests/test_complex_aggregation.py @@ -1,5 +1,6 @@ from datetime import datetime, timezone, date from decimal import Decimal as PythonDecimal +from typing import Optional import pandas as pd import pytest @@ -13,6 +14,8 @@ Dataset, Max, Min, + Average, + Stddev, ) from fennel.dtypes import Decimal, Continuous from fennel.featuresets import featureset, feature as F @@ -202,3 +205,139 @@ class CountryFeatures: datetime(1987, 6, 6, tzinfo=timezone.utc), datetime(1970, 1, 2, tzinfo=timezone.utc), ] + + +@pytest.mark.integration +@mock +def test_none_default(client): + @source(webhook.endpoint("UserInfoDataset"), disorder="14d", cdc="append") + @dataset + class UserInfoDataset: + user_id: int + country: str + income: Decimal[2] + timestamp: datetime = field(timestamp=True) + + @dataset(index=True) + class CountryDS: + country: str = field(key=True) + min_income: Optional[Decimal[2]] + max_income: Optional[Decimal[2]] + avg_income: Optional[float] + stddev_income: Optional[float] + timestamp: datetime = field(timestamp=True) + + @pipeline + @inputs(UserInfoDataset) + def avg_income_pipeline(cls, event: Dataset): + return event.groupby("country").aggregate( + min_income=Min( + of="income", + window=Continuous("forever"), + default=None, + ), + max_income=Max( + of="income", + window=Continuous("forever"), + default=None, + ), + avg_income=Average( + of="income", + window=Continuous("forever"), + default=None, + ), + stddev_income=Stddev( + of="income", + window=Continuous("forever"), + default=None, + ), + ) + + @featureset + class CountryFeatures: + country: str + min_income: Decimal[2] = F( + CountryDS.min_income, default=PythonDecimal("1.20") + ) + max_income: Decimal[2] = F( + CountryDS.max_income, default=PythonDecimal("2.20") + ) + avg_income: float = F(CountryDS.avg_income, default=1.20) + stddev_income: Optional[float] = F(CountryDS.stddev_income) + + # Sync the dataset + response = client.commit( + message="msg", + datasets=[UserInfoDataset, CountryDS], + featuresets=[CountryFeatures], + ) + assert response.status_code == requests.codes.OK, response.json() + + client.sleep(30) + + now = datetime.now(timezone.utc) + df = pd.DataFrame( + { + "user_id": [1, 2, 3, 4, 5], + "country": ["India", "USA", "India", "USA", "UK"], + "income": [ + PythonDecimal("1200.10"), + PythonDecimal("1000.10"), + PythonDecimal("1400.10"), + PythonDecimal("90.10"), + PythonDecimal("1100.10"), + ], + "timestamp": [now, now, now, now, now], + } + ) + response = client.log("fennel_webhook", "UserInfoDataset", df) + assert response.status_code == requests.codes.OK, response.json() + + client.sleep() + + df = client.query( + inputs=[CountryFeatures.country], + outputs=[CountryFeatures], + input_dataframe=pd.DataFrame( + {"CountryFeatures.country": ["India", "USA", "UK", "China"]} + ), + ) + assert df.shape == (4, 5) + assert df["CountryFeatures.country"].tolist() == [ + "India", + "USA", + "UK", + "China", + ] + assert df["CountryFeatures.min_income"].tolist() == [ + PythonDecimal("1200.10"), + PythonDecimal("90.10"), + PythonDecimal("1100.10"), + PythonDecimal("1.20"), + ] + assert df["CountryFeatures.max_income"].tolist() == [ + PythonDecimal("1400.10"), + PythonDecimal("1000.10"), + PythonDecimal("1100.10"), + PythonDecimal("2.20"), + ] + if client.is_integration_client(): + assert df["CountryFeatures.avg_income"].tolist() == [ + 1300.1, + 545.1, + 1100.1, + 1.2, + ] + else: + assert df["CountryFeatures.avg_income"].tolist() == [ + 1300.1000000000001, + 545.1, + 1100.1000000000001, + 1.2, + ] + assert df["CountryFeatures.stddev_income"].tolist() == [ + 100.0, + 455.0, + 0, + pd.NA, + ] diff --git a/fennel/datasets/aggregate.py b/fennel/datasets/aggregate.py index 8c5bc47b3..5a6f3c46a 100644 --- a/fennel/datasets/aggregate.py +++ b/fennel/datasets/aggregate.py @@ -1,11 +1,17 @@ from datetime import date, datetime from decimal import Decimal as PythonDecimal -from typing import List, Union, Optional +from typing import List, Union, Optional, Any import fennel.gen.spec_pb2 as spec_proto +import fennel.gen.schema_pb2 as schema_proto from fennel._vendor.pydantic import BaseModel, Extra, validator # type: ignore from fennel.dtypes import Continuous, Tumbling, Hopping from fennel.internal_lib.duration import Duration, duration_to_timedelta +from fennel.internal_lib.utils.utils import ( + to_timestamp_proto, + to_date_proto, + to_decimal_proto, +) ItemType = Union[str, List[str]] @@ -109,7 +115,7 @@ class Sum(AggregateType): def to_proto(self): if self.window is None: - raise ValueError("Window must be specified for Distinct") + raise ValueError("Window must be specified for Sum") return spec_proto.PreSpec( sum=spec_proto.Sum( window=self.window.to_proto(), @@ -124,17 +130,24 @@ def signature(self): class Average(AggregateType): of: str - default: float = 0.0 + default: Optional[float] = 0.0 def to_proto(self): if self.window is None: - raise ValueError("Window must be specified for Distinct") + raise ValueError("Window must be specified for Average") + if self.default is None: + default = 0.0 + default_null = True + else: + default = self.default + default_null = False return spec_proto.PreSpec( average=spec_proto.Average( window=self.window.to_proto(), name=self.into_field, of=self.of, - default=self.default, + default=default, + default_null=default_null, ) ) @@ -150,7 +163,7 @@ class Quantile(AggregateType): def to_proto(self): if self.window is None: - raise ValueError("Window must be specified for Distinct") + raise ValueError("Window must be specified for Quantile") return spec_proto.PreSpec( quantile=spec_proto.Quantile( window=self.window.to_proto(), @@ -183,6 +196,8 @@ class ExpDecaySum(AggregateType): half_life: Duration def to_proto(self): + if self.window is None: + raise ValueError("Window must be specified for ExpDecaySum") half_life = duration_to_timedelta(self.half_life) return spec_proto.PreSpec( exp_decay=spec_proto.ExponentialDecayAggregate( @@ -214,25 +229,42 @@ def validate(self): class Max(AggregateType): of: str - default: Union[float, int, date, datetime, PythonDecimal] + default: Any def to_proto(self): if self.window is None: - raise ValueError("Window must be specified for Distinct") + raise ValueError("Window must be specified for Max") if isinstance(self.default, datetime): default = float(self.default.timestamp() * 1000000.0) + default_value = schema_proto.Value( + timestamp=to_timestamp_proto(self.default) + ) elif isinstance(self.default, date): default = float((self.default - date(1970, 1, 1)).days) + default_value = schema_proto.Value(date=to_date_proto(self.default)) elif isinstance(self.default, PythonDecimal): default = float(self.default) - else: + default_value = schema_proto.Value( + decimal=to_decimal_proto(self.default) + ) + elif isinstance(self.default, float): + default = self.default + default_value = schema_proto.Value(float=self.default) + elif isinstance(self.default, int): default = float(self.default) + default_value = schema_proto.Value(int=self.default) + elif self.default is None: + default_value = schema_proto.Value(none=schema_proto.Value().none) + default = 0.0 + else: + raise ValueError(f"invalid default value for Min: `{self.default}`") return spec_proto.PreSpec( max=spec_proto.Max( window=self.window.to_proto(), name=self.into_field, of=self.of, default=default, + default_value=default_value, ) ) @@ -245,25 +277,42 @@ def agg_type(self): class Min(AggregateType): of: str - default: Union[float, int, date, datetime, PythonDecimal] + default: Any def to_proto(self): if self.window is None: - raise ValueError("Window must be specified for Distinct") + raise ValueError("Window must be specified for Min") if isinstance(self.default, datetime): default = float(self.default.timestamp() * 1000000.0) + default_value = schema_proto.Value( + timestamp=to_timestamp_proto(self.default) + ) elif isinstance(self.default, date): default = float((self.default - date(1970, 1, 1)).days) + default_value = schema_proto.Value(date=to_date_proto(self.default)) elif isinstance(self.default, PythonDecimal): default = float(self.default) - else: + default_value = schema_proto.Value( + decimal=to_decimal_proto(self.default) + ) + elif isinstance(self.default, float): + default = self.default + default_value = schema_proto.Value(float=self.default) + elif isinstance(self.default, int): default = float(self.default) + default_value = schema_proto.Value(int=self.default) + elif self.default is None: + default_value = schema_proto.Value(none=schema_proto.Value().none) + default = 0.0 + else: + raise ValueError(f"invalid default value for Min: `{self.default}`") return spec_proto.PreSpec( min=spec_proto.Min( window=self.window.to_proto(), name=self.into_field, of=self.of, default=default, + default_value=default_value, ) ) @@ -324,16 +373,23 @@ def signature(self): class Stddev(AggregateType): of: str - default: float = -1.0 + default: Optional[float] = -1.0 def to_proto(self): if self.window is None: - raise ValueError("Window must be specified for Distinct") + raise ValueError("Window must be specified for Stddev") + if self.default is None: + default = 0.0 + default_null = True + else: + default = self.default + default_null = False return spec_proto.PreSpec( stddev=spec_proto.Stddev( window=self.window.to_proto(), name=self.into_field, - default=self.default, + default=default, + default_null=default_null, of=self.of, ) ) diff --git a/fennel/datasets/datasets.py b/fennel/datasets/datasets.py index b9455168f..9264023a5 100644 --- a/fennel/datasets/datasets.py +++ b/fennel/datasets/datasets.py @@ -2799,7 +2799,10 @@ def visitAggregate(self, obj) -> DSSchema: raise TypeError( f"Cannot take average of field `{agg.of}` of type `{dtype_to_string(dtype)}`" ) - values[agg.into_field] = pd.Float64Dtype # type: ignore + if agg.default is None: + values[agg.into_field] = Optional[pd.Float64Dtype] # type: ignore + else: + values[agg.into_field] = pd.Float64Dtype # type: ignore elif isinstance(agg, LastK): dtype = input_schema.get_type(agg.of) if agg.dropnull: @@ -2839,7 +2842,10 @@ def visitAggregate(self, obj) -> DSSchema: raise TypeError( f"invalid min: default value `{agg.default}` not of type `int`" ) - values[agg.into_field] = fennel_get_optional_inner(dtype) # type: ignore + if agg.default is None: + values[agg.into_field] = Optional[fennel_get_optional_inner(dtype)] # type: ignore + else: + values[agg.into_field] = fennel_get_optional_inner(dtype) # type: ignore elif isinstance(agg, Max): dtype = input_schema.get_type(agg.of) primtive_dtype = get_primitive_dtype_with_optional(dtype) @@ -2857,7 +2863,10 @@ def visitAggregate(self, obj) -> DSSchema: raise TypeError( f"invalid max: default value `{agg.default}` not of type `int`" ) - values[agg.into_field] = fennel_get_optional_inner(dtype) # type: ignore + if agg.default is None: + values[agg.into_field] = Optional[fennel_get_optional_inner(dtype)] # type: ignore + else: + values[agg.into_field] = fennel_get_optional_inner(dtype) # type: ignore elif isinstance(agg, Stddev): dtype = input_schema.get_type(agg.of) if ( @@ -2867,7 +2876,10 @@ def visitAggregate(self, obj) -> DSSchema: raise TypeError( f"Cannot get standard deviation of field {agg.of} of type {dtype_to_string(dtype)}" ) - values[agg.into_field] = pd.Float64Dtype # type: ignore + if agg.default is None: + values[agg.into_field] = Optional[pd.Float64Dtype] # type: ignore + else: + values[agg.into_field] = pd.Float64Dtype # type: ignore elif isinstance(agg, Quantile): dtype = input_schema.get_type(agg.of) if ( diff --git a/fennel/gen/spec_pb2.py b/fennel/gen/spec_pb2.py index c0ca8043c..33a3db458 100644 --- a/fennel/gen/spec_pb2.py +++ b/fennel/gen/spec_pb2.py @@ -13,37 +13,38 @@ import fennel.gen.window_pb2 as window__pb2 +import fennel.gen.schema_pb2 as schema__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\nspec.proto\x12\x11\x66\x65nnel.proto.spec\x1a\x0cwindow.proto\"\x8f\x04\n\x07PreSpec\x12%\n\x03sum\x18\x01 \x01(\x0b\x32\x16.fennel.proto.spec.SumH\x00\x12-\n\x07\x61verage\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.spec.AverageH\x00\x12)\n\x05\x63ount\x18\x03 \x01(\x0b\x32\x18.fennel.proto.spec.CountH\x00\x12*\n\x06last_k\x18\x04 \x01(\x0b\x32\x18.fennel.proto.spec.LastKH\x00\x12%\n\x03min\x18\x05 \x01(\x0b\x32\x16.fennel.proto.spec.MinH\x00\x12%\n\x03max\x18\x06 \x01(\x0b\x32\x16.fennel.proto.spec.MaxH\x00\x12+\n\x06stddev\x18\x07 \x01(\x0b\x32\x19.fennel.proto.spec.StddevH\x00\x12/\n\x08\x64istinct\x18\x08 \x01(\x0b\x32\x1b.fennel.proto.spec.DistinctH\x00\x12/\n\x08quantile\x18\t \x01(\x0b\x32\x1b.fennel.proto.spec.QuantileH\x00\x12\x41\n\texp_decay\x18\n \x01(\x0b\x32,.fennel.proto.spec.ExponentialDecayAggregateH\x00\x12,\n\x07\x66irst_k\x18\x0b \x01(\x0b\x32\x19.fennel.proto.spec.FirstKH\x00\x42\t\n\x07variant\"L\n\x03Sum\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\"a\n\x07\x41verage\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"\x80\x01\n\x05\x43ount\x12\x0c\n\x04name\x18\x01 \x01(\t\x12+\n\x06window\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0e\n\x06unique\x18\x03 \x01(\x08\x12\x0e\n\x06\x61pprox\x18\x04 \x01(\x08\x12\n\n\x02of\x18\x05 \x01(\t\x12\x10\n\x08\x64ropnull\x18\x06 \x01(\x08\"~\n\x05LastK\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05limit\x18\x03 \x01(\r\x12\r\n\x05\x64\x65\x64up\x18\x04 \x01(\x08\x12+\n\x06window\x18\x05 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x10\n\x08\x64ropnull\x18\x06 \x01(\x08\"\x7f\n\x06\x46irstK\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05limit\x18\x03 \x01(\r\x12\r\n\x05\x64\x65\x64up\x18\x04 \x01(\x08\x12+\n\x06window\x18\x05 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x10\n\x08\x64ropnull\x18\x06 \x01(\x08\"]\n\x03Min\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"]\n\x03Max\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"`\n\x06Stddev\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"c\n\x08\x44istinct\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x10\n\x08\x64ropnull\x18\x04 \x01(\x08\"\x95\x01\n\x08Quantile\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x14\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01H\x00\x88\x01\x01\x12\x10\n\x08quantile\x18\x05 \x01(\x01\x12\x0e\n\x06\x61pprox\x18\x06 \x01(\x08\x42\n\n\x08_default\"}\n\x19\x45xponentialDecayAggregate\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x19\n\x11half_life_seconds\x18\x04 \x01(\rb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\nspec.proto\x12\x11\x66\x65nnel.proto.spec\x1a\x0cwindow.proto\x1a\x0cschema.proto\"\x8f\x04\n\x07PreSpec\x12%\n\x03sum\x18\x01 \x01(\x0b\x32\x16.fennel.proto.spec.SumH\x00\x12-\n\x07\x61verage\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.spec.AverageH\x00\x12)\n\x05\x63ount\x18\x03 \x01(\x0b\x32\x18.fennel.proto.spec.CountH\x00\x12*\n\x06last_k\x18\x04 \x01(\x0b\x32\x18.fennel.proto.spec.LastKH\x00\x12%\n\x03min\x18\x05 \x01(\x0b\x32\x16.fennel.proto.spec.MinH\x00\x12%\n\x03max\x18\x06 \x01(\x0b\x32\x16.fennel.proto.spec.MaxH\x00\x12+\n\x06stddev\x18\x07 \x01(\x0b\x32\x19.fennel.proto.spec.StddevH\x00\x12/\n\x08\x64istinct\x18\x08 \x01(\x0b\x32\x1b.fennel.proto.spec.DistinctH\x00\x12/\n\x08quantile\x18\t \x01(\x0b\x32\x1b.fennel.proto.spec.QuantileH\x00\x12\x41\n\texp_decay\x18\n \x01(\x0b\x32,.fennel.proto.spec.ExponentialDecayAggregateH\x00\x12,\n\x07\x66irst_k\x18\x0b \x01(\x0b\x32\x19.fennel.proto.spec.FirstKH\x00\x42\t\n\x07variant\"L\n\x03Sum\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\"w\n\x07\x41verage\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\x12\x14\n\x0c\x64\x65\x66\x61ult_null\x18\x05 \x01(\x08\"\x80\x01\n\x05\x43ount\x12\x0c\n\x04name\x18\x01 \x01(\t\x12+\n\x06window\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0e\n\x06unique\x18\x03 \x01(\x08\x12\x0e\n\x06\x61pprox\x18\x04 \x01(\x08\x12\n\n\x02of\x18\x05 \x01(\t\x12\x10\n\x08\x64ropnull\x18\x06 \x01(\x08\"~\n\x05LastK\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05limit\x18\x03 \x01(\r\x12\r\n\x05\x64\x65\x64up\x18\x04 \x01(\x08\x12+\n\x06window\x18\x05 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x10\n\x08\x64ropnull\x18\x06 \x01(\x08\"\x7f\n\x06\x46irstK\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05limit\x18\x03 \x01(\r\x12\r\n\x05\x64\x65\x64up\x18\x04 \x01(\x08\x12+\n\x06window\x18\x05 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x10\n\x08\x64ropnull\x18\x06 \x01(\x08\"\x90\x01\n\x03Min\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\x12\x31\n\rdefault_value\x18\x05 \x01(\x0b\x32\x1a.fennel.proto.schema.Value\"\x90\x01\n\x03Max\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\x12\x31\n\rdefault_value\x18\x05 \x01(\x0b\x32\x1a.fennel.proto.schema.Value\"v\n\x06Stddev\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\x12\x14\n\x0c\x64\x65\x66\x61ult_null\x18\x05 \x01(\x08\"c\n\x08\x44istinct\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x10\n\x08\x64ropnull\x18\x04 \x01(\x08\"\x95\x01\n\x08Quantile\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x14\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01H\x00\x88\x01\x01\x12\x10\n\x08quantile\x18\x05 \x01(\x01\x12\x0e\n\x06\x61pprox\x18\x06 \x01(\x08\x42\n\n\x08_default\"}\n\x19\x45xponentialDecayAggregate\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x19\n\x11half_life_seconds\x18\x04 \x01(\rb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'spec_pb2', _globals) if not _descriptor._USE_C_DESCRIPTORS: DESCRIPTOR._loaded_options = None - _globals['_PRESPEC']._serialized_start=48 - _globals['_PRESPEC']._serialized_end=575 - _globals['_SUM']._serialized_start=577 - _globals['_SUM']._serialized_end=653 - _globals['_AVERAGE']._serialized_start=655 - _globals['_AVERAGE']._serialized_end=752 - _globals['_COUNT']._serialized_start=755 - _globals['_COUNT']._serialized_end=883 - _globals['_LASTK']._serialized_start=885 - _globals['_LASTK']._serialized_end=1011 - _globals['_FIRSTK']._serialized_start=1013 - _globals['_FIRSTK']._serialized_end=1140 - _globals['_MIN']._serialized_start=1142 - _globals['_MIN']._serialized_end=1235 - _globals['_MAX']._serialized_start=1237 - _globals['_MAX']._serialized_end=1330 - _globals['_STDDEV']._serialized_start=1332 - _globals['_STDDEV']._serialized_end=1428 - _globals['_DISTINCT']._serialized_start=1430 - _globals['_DISTINCT']._serialized_end=1529 - _globals['_QUANTILE']._serialized_start=1532 - _globals['_QUANTILE']._serialized_end=1681 - _globals['_EXPONENTIALDECAYAGGREGATE']._serialized_start=1683 - _globals['_EXPONENTIALDECAYAGGREGATE']._serialized_end=1808 + _globals['_PRESPEC']._serialized_start=62 + _globals['_PRESPEC']._serialized_end=589 + _globals['_SUM']._serialized_start=591 + _globals['_SUM']._serialized_end=667 + _globals['_AVERAGE']._serialized_start=669 + _globals['_AVERAGE']._serialized_end=788 + _globals['_COUNT']._serialized_start=791 + _globals['_COUNT']._serialized_end=919 + _globals['_LASTK']._serialized_start=921 + _globals['_LASTK']._serialized_end=1047 + _globals['_FIRSTK']._serialized_start=1049 + _globals['_FIRSTK']._serialized_end=1176 + _globals['_MIN']._serialized_start=1179 + _globals['_MIN']._serialized_end=1323 + _globals['_MAX']._serialized_start=1326 + _globals['_MAX']._serialized_end=1470 + _globals['_STDDEV']._serialized_start=1472 + _globals['_STDDEV']._serialized_end=1590 + _globals['_DISTINCT']._serialized_start=1592 + _globals['_DISTINCT']._serialized_end=1691 + _globals['_QUANTILE']._serialized_start=1694 + _globals['_QUANTILE']._serialized_end=1843 + _globals['_EXPONENTIALDECAYAGGREGATE']._serialized_start=1845 + _globals['_EXPONENTIALDECAYAGGREGATE']._serialized_end=1970 # @@protoc_insertion_point(module_scope) diff --git a/fennel/gen/spec_pb2.pyi b/fennel/gen/spec_pb2.pyi index 99baeba0a..a85da2377 100644 --- a/fennel/gen/spec_pb2.pyi +++ b/fennel/gen/spec_pb2.pyi @@ -5,6 +5,7 @@ isort:skip_file import builtins import google.protobuf.descriptor import google.protobuf.message +import schema_pb2 import sys import window_pb2 @@ -104,11 +105,13 @@ class Average(google.protobuf.message.Message): NAME_FIELD_NUMBER: builtins.int WINDOW_FIELD_NUMBER: builtins.int DEFAULT_FIELD_NUMBER: builtins.int + DEFAULT_NULL_FIELD_NUMBER: builtins.int of: builtins.str name: builtins.str @property def window(self) -> window_pb2.Window: ... default: builtins.float + default_null: builtins.bool def __init__( self, *, @@ -116,9 +119,10 @@ class Average(google.protobuf.message.Message): name: builtins.str = ..., window: window_pb2.Window | None = ..., default: builtins.float = ..., + default_null: builtins.bool = ..., ) -> None: ... def HasField(self, field_name: typing_extensions.Literal["window", b"window"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["default", b"default", "name", b"name", "of", b"of", "window", b"window"]) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["default", b"default", "default_null", b"default_null", "name", b"name", "of", b"of", "window", b"window"]) -> None: ... global___Average = Average @@ -226,11 +230,14 @@ class Min(google.protobuf.message.Message): NAME_FIELD_NUMBER: builtins.int WINDOW_FIELD_NUMBER: builtins.int DEFAULT_FIELD_NUMBER: builtins.int + DEFAULT_VALUE_FIELD_NUMBER: builtins.int of: builtins.str name: builtins.str @property def window(self) -> window_pb2.Window: ... default: builtins.float + @property + def default_value(self) -> schema_pb2.Value: ... def __init__( self, *, @@ -238,9 +245,10 @@ class Min(google.protobuf.message.Message): name: builtins.str = ..., window: window_pb2.Window | None = ..., default: builtins.float = ..., + default_value: schema_pb2.Value | None = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["window", b"window"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["default", b"default", "name", b"name", "of", b"of", "window", b"window"]) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["default_value", b"default_value", "window", b"window"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["default", b"default", "default_value", b"default_value", "name", b"name", "of", b"of", "window", b"window"]) -> None: ... global___Min = Min @@ -252,11 +260,14 @@ class Max(google.protobuf.message.Message): NAME_FIELD_NUMBER: builtins.int WINDOW_FIELD_NUMBER: builtins.int DEFAULT_FIELD_NUMBER: builtins.int + DEFAULT_VALUE_FIELD_NUMBER: builtins.int of: builtins.str name: builtins.str @property def window(self) -> window_pb2.Window: ... default: builtins.float + @property + def default_value(self) -> schema_pb2.Value: ... def __init__( self, *, @@ -264,9 +275,10 @@ class Max(google.protobuf.message.Message): name: builtins.str = ..., window: window_pb2.Window | None = ..., default: builtins.float = ..., + default_value: schema_pb2.Value | None = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["window", b"window"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["default", b"default", "name", b"name", "of", b"of", "window", b"window"]) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["default_value", b"default_value", "window", b"window"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["default", b"default", "default_value", b"default_value", "name", b"name", "of", b"of", "window", b"window"]) -> None: ... global___Max = Max @@ -278,11 +290,13 @@ class Stddev(google.protobuf.message.Message): NAME_FIELD_NUMBER: builtins.int WINDOW_FIELD_NUMBER: builtins.int DEFAULT_FIELD_NUMBER: builtins.int + DEFAULT_NULL_FIELD_NUMBER: builtins.int of: builtins.str name: builtins.str @property def window(self) -> window_pb2.Window: ... default: builtins.float + default_null: builtins.bool def __init__( self, *, @@ -290,9 +304,10 @@ class Stddev(google.protobuf.message.Message): name: builtins.str = ..., window: window_pb2.Window | None = ..., default: builtins.float = ..., + default_null: builtins.bool = ..., ) -> None: ... def HasField(self, field_name: typing_extensions.Literal["window", b"window"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["default", b"default", "name", b"name", "of", b"of", "window", b"window"]) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["default", b"default", "default_null", b"default_null", "name", b"name", "of", b"of", "window", b"window"]) -> None: ... global___Stddev = Stddev diff --git a/fennel/internal_lib/utils/utils.py b/fennel/internal_lib/utils/utils.py index 2c8688d3b..fe0c22250 100644 --- a/fennel/internal_lib/utils/utils.py +++ b/fennel/internal_lib/utils/utils.py @@ -1,11 +1,13 @@ import dataclasses -from datetime import datetime -from decimal import Decimal +from decimal import Decimal as PythonDecimal +from datetime import datetime, date from typing import Any, Optional, Union, Dict import numpy as np import pandas as pd from frozendict import frozendict +from google.protobuf.timestamp_pb2 import Timestamp + from fennel.gen import schema_pb2 as schema_proto from fennel.gen.schema_pb2 import DataType from fennel.internal_lib import FENNEL_STRUCT @@ -118,8 +120,8 @@ def cast_col_to_pandas( return pd.Series( [ ( - Decimal("%0.{}f".format(scale) % float(x)) - if not isinstance(x, Decimal) + PythonDecimal("%0.{}f".format(scale) % float(x)) + if not isinstance(x, PythonDecimal) else x ) for x in series @@ -219,3 +221,21 @@ def parse_datetime_in_value( return output else: return value + + +def to_timestamp_proto(dt: datetime) -> Timestamp: + ts = Timestamp() + ts.FromDatetime(dt) + return ts + + +def to_date_proto(dt: date) -> schema_proto.Date: + return schema_proto.Date( + days=(dt - date(1970, 1, 1)).days, + ) + + +def to_decimal_proto(decimal: PythonDecimal) -> schema_proto.Decimal: + exponent = abs(int(decimal.as_tuple().exponent)) + value = int((decimal * pow(10, exponent)).to_integral_exact()) + return schema_proto.Decimal(value=value, scale=exponent) diff --git a/fennel/testing/execute_aggregation.py b/fennel/testing/execute_aggregation.py index 073629ae3..09d55c4f1 100644 --- a/fennel/testing/execute_aggregation.py +++ b/fennel/testing/execute_aggregation.py @@ -5,7 +5,7 @@ from datetime import datetime, timezone, timedelta, date from decimal import Decimal from math import sqrt -from typing import Dict, List, Type, Union, Any +from typing import Dict, List, Type, Union, Any, Optional import numpy as np import pandas as pd @@ -305,7 +305,9 @@ def top(self): class MinState(AggState): - def __init__(self, default: Union[float, int, date, datetime, Decimal]): + def __init__( + self, default: Optional[Union[float, int, date, datetime, Decimal]] + ): self.counter = Counter() # type: ignore self.min_heap = Heap(heap_type="min") self.default = default @@ -336,7 +338,9 @@ def get_val(self): class MaxState(AggState): - def __init__(self, default: Union[float, int, date, datetime, Decimal]): + def __init__( + self, default: Optional[Union[float, int, date, datetime, Decimal]] + ): self.counter = Counter() # type: ignore self.max_heap = Heap(heap_type="max") self.default = default diff --git a/fennel/testing/executor.py b/fennel/testing/executor.py index bfd6e69d1..b9527b4e9 100644 --- a/fennel/testing/executor.py +++ b/fennel/testing/executor.py @@ -1,13 +1,10 @@ import copy import types from dataclasses import dataclass -from datetime import datetime, timezone from typing import Any, Optional, Dict, List -import numpy as np import pandas as pd from fennel.expr.visitor import ExprPrinter -import pyarrow as pa from frozendict import frozendict import fennel.gen.schema_pb2 as schema_proto @@ -28,7 +25,6 @@ Stddev, ) from fennel.gen.schema_pb2 import Field -from fennel.internal_lib.duration import duration_to_timedelta from fennel.internal_lib.schema import get_datatype, fennel_is_optional from fennel.internal_lib.schema import validate_field_in_df from fennel.internal_lib.to_proto import ( @@ -510,13 +506,13 @@ def visitJoin(self, obj) -> Optional[NodeRet]: list(obj.dataset.dsschema().values.keys()) ) merged_df = left_join_empty( - input_ret, right_ret, right_value_schema, obj.fields + input_ret, right_ret, right_value_schema, obj.fields # type: ignore ) else: if len(input_ret.key_fields) > 0: merged_df = table_table_join( - input_ret, - right_ret, + input_ret, # type: ignore + right_ret, # type: ignore obj.how, obj.on, obj.left_on, @@ -525,8 +521,8 @@ def visitJoin(self, obj) -> Optional[NodeRet]: ) else: merged_df = stream_table_join( - input_ret, - right_ret, + input_ret, # type: ignore + right_ret, # type: ignore obj.how, obj.within, obj.on, diff --git a/pyproject.toml b/pyproject.toml index 000ed2d38..9dbdff24b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fennel-ai" -version = "1.5.58" +version = "1.6.0" description = "The modern realtime feature engineering platform" authors = ["Fennel AI "] packages = [{ include = "fennel" }]