From c7a227b2d2ecda7e9c7a149dbfdf6bdf569ce403 Mon Sep 17 00:00:00 2001 From: Satwant Rana <4613501+satrana42@users.noreply.github.com> Date: Sat, 21 Sep 2024 15:23:50 +0530 Subject: [PATCH] Add FirstK aggregation (#553) Add FirstK aggregation in codebase along the lines of LastK aggregation. --- .wordlist.txt | 1 + docs/api.yml | 3 +- .../api-reference/aggregations/firstk.py | 150 ++++++++++++++++++ .../api-reference/aggregations/firstk.md | 63 ++++++++ fennel/CHANGELOG.md | 3 + fennel/__init__.py | 1 + fennel/client_tests/test_social_network.py | 38 ++++- fennel/client_tests/test_struct_type.py | 124 ++++++++++++++- fennel/datasets/__init__.py | 1 + fennel/datasets/aggregate.py | 24 ++- fennel/datasets/datasets.py | 9 ++ fennel/datasets/test_dataset.py | 20 +++ fennel/gen/auth_pb2.py | 1 + fennel/gen/connector_pb2.py | 1 + fennel/gen/dataset_pb2.py | 1 + fennel/gen/expectations_pb2.py | 1 + fennel/gen/expr_pb2.py | 1 + fennel/gen/featureset_pb2.py | 1 + fennel/gen/format_pb2.py | 1 + fennel/gen/http_auth_pb2.py | 1 + fennel/gen/index_pb2.py | 1 + fennel/gen/kinesis_pb2.py | 1 + fennel/gen/metadata_pb2.py | 1 + fennel/gen/pycode_pb2.py | 1 + fennel/gen/schema_pb2.py | 1 + fennel/gen/schema_registry_pb2.py | 1 + fennel/gen/services_pb2.py | 1 + fennel/gen/spec_pb2.py | 47 +++--- fennel/gen/spec_pb2.pyi | 39 ++++- fennel/gen/status_pb2.py | 1 + fennel/gen/window_pb2.py | 1 + fennel/lib/aggregate/__init__.py | 1 + fennel/testing/execute_aggregation.py | 40 +++++ fennel/testing/executor.py | 3 +- fennel/testing/test_execute_aggregation.py | 30 ++++ pyproject.toml | 2 +- 36 files changed, 585 insertions(+), 31 deletions(-) create mode 100644 docs/examples/api-reference/aggregations/firstk.py create mode 100644 docs/pages/api-reference/aggregations/firstk.md diff --git a/.wordlist.txt b/.wordlist.txt index 074fda262..1f61ba1cb 100644 --- a/.wordlist.txt +++ b/.wordlist.txt @@ -37,6 +37,7 @@ Eval eval endswith FennelDataAccessRole +FirstK Flink Flink's GCP diff --git a/docs/api.yml b/docs/api.yml index b8139cf9f..71fa7dcdb 100644 --- a/docs/api.yml +++ b/docs/api.yml @@ -76,6 +76,7 @@ sidebar: - "api-reference/aggregations/count" - "api-reference/aggregations/distinct" - "api-reference/aggregations/lastk" + - "api-reference/aggregations/firstk" - "api-reference/aggregations/max" - "api-reference/aggregations/min" - "api-reference/aggregations/stddev" @@ -171,4 +172,4 @@ sidebar: - slug: "api-reference/expectations" title: "Expectations" pages: - - "api-reference/expectations" \ No newline at end of file + - "api-reference/expectations" diff --git a/docs/examples/api-reference/aggregations/firstk.py b/docs/examples/api-reference/aggregations/firstk.py new file mode 100644 index 000000000..05cb1cd5b --- /dev/null +++ b/docs/examples/api-reference/aggregations/firstk.py @@ -0,0 +1,150 @@ +from datetime import datetime +from typing import List + +import pandas as pd +import pytest + +from fennel.testing import mock + +__owner__ = "satwant@fennel.ai" + + +@mock +def test_basic(client): + # docsnip basic + from fennel.datasets import dataset, field, pipeline, Dataset, FirstK + from fennel.dtypes import Continuous + from fennel.lib import inputs + from fennel.connectors import source, Webhook + + webhook = Webhook(name="webhook") + + @source(webhook.endpoint("Transaction"), disorder="14d", cdc="append") + @dataset + class Transaction: + uid: int + amount: int + timestamp: datetime + + @dataset(index=True) + class Aggregated: + uid: int = field(key=True) + amounts: List[int] + timestamp: datetime + + @pipeline + @inputs(Transaction) + def firstk_pipeline(cls, ds: Dataset): + return ds.groupby("uid").aggregate( + # docsnip-highlight start + amounts=FirstK( + of="amount", + limit=10, + dedup=False, + window=Continuous("1d"), + ), + # docsnip-highlight end + ) + + # /docsnip + client.commit(message="msg", datasets=[Transaction, Aggregated]) + # log some rows to the transaction dataset + client.log( + "webhook", + "Transaction", + pd.DataFrame( + [ + { + "uid": 1, + "vendor": "A", + "amount": 10, + "timestamp": "2021-01-01T00:00:00", + }, + { + "uid": 1, + "vendor": "B", + "amount": 20, + "timestamp": "2021-01-02T00:00:00", + }, + { + "uid": 2, + "vendor": "A", + "amount": 30, + "timestamp": "2021-01-03T00:00:00", + }, + { + "uid": 2, + "vendor": "B", + "amount": 40, + "timestamp": "2021-01-04T00:00:00", + }, + { + "uid": 3, + "vendor": "A", + "amount": 50, + "timestamp": "2021-01-05T00:00:00", + }, + { + "uid": 3, + "vendor": "B", + "amount": 60, + "timestamp": "2021-01-06T00:00:00", + }, + ] + ), + ) + + # do lookup on the Aggregated dataset + ts = pd.Series( + [ + datetime(2021, 1, 6, 0, 0, 0), + datetime(2021, 1, 6, 0, 0, 0), + datetime(2021, 1, 6, 0, 0, 0), + ] + ) + df, found = Aggregated.lookup(ts, uid=pd.Series([1, 2, 3])) + assert found.tolist() == [True, True, True] + assert df["uid"].tolist() == [1, 2, 3] + assert df["amounts"].tolist() == [[], [], [50, 60]] + + +@mock +def test_invalid_type(client): + with pytest.raises(Exception): + # docsnip incorrect_type + from fennel.datasets import dataset, field, pipeline, Dataset, FirstK + from fennel.dtypes import Continuous + from fennel.lib import inputs + from fennel.connectors import source, Webhook + + webhook = Webhook(name="webhook") + + @source(webhook.endpoint("Transaction"), disorder="14d", cdc="append") + @dataset + class Transaction: + uid: int + amount: int + timestamp: datetime + + @dataset + class Aggregated: + uid: int = field(key=True) + # docsnip-highlight next-line + amounts: int # should be List[int] + timestamp: datetime + + @pipeline + @inputs(Transaction) + def bad_pipeline(cls, ds: Dataset): + return ds.groupby("uid").aggregate( + # docsnip-highlight start + amounts=FirstK( + of="amount", + limit=10, + dedup=False, + window=Continuous("1d"), + ), + # docsnip-highlight end + ) + + # /docsnip diff --git a/docs/pages/api-reference/aggregations/firstk.md b/docs/pages/api-reference/aggregations/firstk.md new file mode 100644 index 000000000..90db19b41 --- /dev/null +++ b/docs/pages/api-reference/aggregations/firstk.md @@ -0,0 +1,63 @@ +--- +title: FirstK +order: 0 +status: published +--- +### FirstK +Aggregation to computes a rolling list of the earliest values for each group +within a window. + +#### Parameters + +Name of the field in the input dataset over which the aggregation should be +computed. + + + +The continuous window within which aggregation needs to be computed. Possible +values are `"forever"` or any [time duration](/api-reference/data-types/duration). + + + +The name of the field in the output dataset that should store the result of this +aggregation. This field is expected to be of type `List[T]` where `T` is the type +of the field denoted by `of`. + + + +Since storing all the values for a group can get costly, FirstK expects a +`limit` to be specified which denotes the maximum size of the list that should +be maintained at any point. + + + +If set to True, only distinct values are stored else values stored in the first +can have duplicates too. + + +
+
+ +#### Returns + +Stores the result of the aggregation in the appropriate field of the output +dataset. + + + +#### Errors + +The column denoted by `into_field` in the output dataset must be of type `List[T]` +where T is the type of the column denoted by `of` in the input dataset. Commit error +is raised if this is not the case. + + +:::warning +Storing the full set of values and maintaining order between them can get costly, +so use this aggregation only when needed. +::: + +
+
diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md index 0ead05f39..006678caf 100644 --- a/fennel/CHANGELOG.md +++ b/fennel/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## [1.5.28] - 2024-09-21 +- Add FirstK aggregation + ## [1.5.27] - 2024-09-20 - Fix schema validation for int64 columns during mock client. Earlier, float64 was considered valid for optional[int64] fields to account for typing limitation diff --git a/fennel/__init__.py b/fennel/__init__.py index cbe921db3..2a29ba9c7 100644 --- a/fennel/__init__.py +++ b/fennel/__init__.py @@ -9,6 +9,7 @@ Distinct, Sum, LastK, + FirstK, Min, Max, Average, diff --git a/fennel/client_tests/test_social_network.py b/fennel/client_tests/test_social_network.py index 424543b51..13d9909ee 100644 --- a/fennel/client_tests/test_social_network.py +++ b/fennel/client_tests/test_social_network.py @@ -5,7 +5,7 @@ import pytest import fennel._vendor.requests as requests -from fennel import LastK +from fennel import LastK, FirstK from fennel.connectors import source, Webhook from fennel.datasets import dataset, field, Dataset, pipeline, Count from fennel.dtypes import regex, oneof, Continuous @@ -166,6 +166,27 @@ def last_viewed_post(cls, view_data: Dataset): ) +@meta(owner="ml-eng@myspace.com") +@dataset(index=True) +class FirstViewedPostByAgg: + user_id: str = field(key=True) + post_id: List[int] + time_stamp: datetime + + @pipeline + @inputs(ViewData) + def first_viewed_post(cls, view_data: Dataset): + return view_data.groupby("user_id").aggregate( + FirstK( + into_field="post_id", + of="post_id", + window=Continuous("forever"), + limit=1, + dedup=False, + ) + ) + + # --- Featuresets --- @@ -188,6 +209,9 @@ class UserFeatures: last_viewed_post2: List[int] = F( LastViewedPostByAgg.post_id, default=[-1] # type: ignore ) + first_viewed_post: List[int] = F( + FirstViewedPostByAgg.post_id, default=[-1] # type: ignore + ) @extractor(deps=[UserViewsDataset]) # type: ignore @inputs(Request.user_id) @@ -234,6 +258,7 @@ def test_social_network(client): UserCategoryDataset, LastViewedPost, LastViewedPostByAgg, + FirstViewedPostByAgg, ], featuresets=[Request, UserFeatures], ) @@ -307,6 +332,11 @@ def test_social_network(client): assert last_post_viewed == [936609766, 735291550] assert last_post_viewed2 == last_post_viewed + first_post_viewed = [ + x[0] for x in feature_df["UserFeatures.first_viewed_post"].to_list() + ] + assert first_post_viewed == [508698801, 43094523] + if client.is_integration_client(): return df = client.get_dataset_df("UserCategoryDataset") @@ -427,6 +457,7 @@ def test_social_network_with_mock_log(client): UserCategoryDataset, LastViewedPost, LastViewedPostByAgg, + FirstViewedPostByAgg, ], featuresets=[Request, UserFeatures], ) @@ -494,5 +525,10 @@ def test_social_network_with_mock_log(client): assert last_post_viewed == [936609766, 735291550] assert last_post_viewed2 == last_post_viewed + first_post_viewed = [ + x[0] for x in feature_df["UserFeatures.first_viewed_post"].to_list() + ] + assert first_post_viewed == [508698801, 43094523] + df = client.get_dataset_df("UserCategoryDataset") assert df.shape == (1998, 4) diff --git a/fennel/client_tests/test_struct_type.py b/fennel/client_tests/test_struct_type.py index a00ea88d1..0b9cdbe5c 100644 --- a/fennel/client_tests/test_struct_type.py +++ b/fennel/client_tests/test_struct_type.py @@ -7,7 +7,7 @@ import fennel._vendor.requests as requests from fennel import connectors from fennel.connectors import source -from fennel.datasets import dataset, Dataset, pipeline, field, LastK +from fennel.datasets import dataset, Dataset, pipeline, field, LastK, FirstK from fennel.dtypes import struct, Continuous from fennel.featuresets import featureset, extractor from fennel.lib import meta, inputs, outputs @@ -61,6 +61,29 @@ def movie_info(cls, movie_cast: Dataset): ) +@meta(owner="test@test.com") +@dataset(index=True) +class MovieInfoWithFirstK: + movie: Movie = field(key=True) + cast_list: List[Cast] + timestamp: datetime = field(timestamp=True) + + @pipeline + @inputs(MovieCast) + def movie_info(cls, movie_cast: Dataset): + return movie_cast.groupby("movie").aggregate( + [ + FirstK( + into_field="cast_list", + of="cast", + window=Continuous("forever"), + limit=3, + dedup=False, + ), + ] + ) + + @meta(owner="test@test.com") @featureset class MovieFeatures: @@ -89,6 +112,34 @@ def extract_average_cast_age(cls, ts: pd.Series, movie: pd.Series): return pd.Series(res["average_cast_age"]) +@meta(owner="test@test.com") +@featureset +class MovieFeaturesWithFirstK: + movie: Movie + cast_list: List[Cast] + average_cast_age: float + + @extractor(deps=[MovieInfoWithFirstK]) # type: ignore + @inputs("movie") + @outputs("cast_list") + def extract_cast(cls, ts: pd.Series, movie: pd.Series): + res, _ = MovieInfoWithFirstK.lookup(ts, movie=movie) # type: ignore + return pd.Series(res["cast_list"]) + + @extractor(deps=[MovieInfoWithFirstK]) # type: ignore + @inputs("movie") + @outputs("average_cast_age") + def extract_average_cast_age(cls, ts: pd.Series, movie: pd.Series): + res, _ = MovieInfoWithFirstK.lookup(ts, movie=movie) # type: ignore + res["total_cast_age"] = res["cast_list"].apply( + lambda x: sum([c.age for c in x]) + ) + res["average_cast_age"] = res["total_cast_age"] / res[ + "cast_list" + ].apply(lambda x: len(x)) + return pd.Series(res["average_cast_age"]) + + def log_movie_data(client): now = datetime.now(timezone.utc) data = [ @@ -188,3 +239,74 @@ def test_struct_type(client): assert df["MovieFeatures.average_cast_age"][0] == 40 assert df["MovieFeatures.average_cast_age"][1] == 45.5 + + +@pytest.mark.integration +@mock +def test_struct_type_with_firstk(client): + client.commit( + message="Initial commit", + datasets=[MovieCast, MovieInfoWithFirstK], + featuresets=[MovieFeaturesWithFirstK], + ) + # Log data to test the pipeline + log_movie_data(client) + + client.sleep() + + input_df = pd.DataFrame( + { + "MovieFeaturesWithFirstK.movie": [ + {"movie_id": 101, "title": "Inception"}, + {"movie_id": 102, "title": "Titanic"}, + ], + } + ) + df = client.query( + outputs=[ + MovieFeaturesWithFirstK.cast_list, + MovieFeaturesWithFirstK.average_cast_age, + ], + inputs=[MovieFeaturesWithFirstK.movie], + input_dataframe=input_df, + ) + + # Verify the returned dataframe + assert df.shape == (2, 2) + assert df.columns.tolist() == [ + "MovieFeaturesWithFirstK.cast_list", + "MovieFeaturesWithFirstK.average_cast_age", + ] + assert ( + len(df["MovieFeaturesWithFirstK.cast_list"][0]) == 2 + ) # 2 cast members for "Inception" + assert ( + len(df["MovieFeaturesWithFirstK.cast_list"][1]) == 2 + ) # 2 cast members for + # "Titanic" + ellen = Cast(name="Ellen Page", actor_id=2, age=34) + cast1 = df["MovieFeaturesWithFirstK.cast_list"][0][0] + assert cast1.name == ellen.name + assert cast1.actor_id == ellen.actor_id + assert cast1.age == ellen.age + + cast2 = df["MovieFeaturesWithFirstK.cast_list"][0][1] + leonardo = Cast(name="Leonardo DiCaprio", actor_id=1, age=46) + assert cast2.name == leonardo.name + assert cast2.actor_id == leonardo.actor_id + assert cast2.age == leonardo.age + + cast3 = df["MovieFeaturesWithFirstK.cast_list"][1][0] + kate = Cast(name="Kate Winslet", actor_id=3, age=45) + assert cast3.name == kate.name + assert cast3.actor_id == kate.actor_id + assert cast3.age == kate.age + + cast4 = df["MovieFeaturesWithFirstK.cast_list"][1][1] + assert cast4.name == leonardo.name + assert cast4.actor_id == leonardo.actor_id + assert cast4.age == leonardo.age + + # Test extract_average_cast_age extractor + assert df["MovieFeaturesWithFirstK.average_cast_age"][0] == 40 + assert df["MovieFeaturesWithFirstK.average_cast_age"][1] == 45.5 diff --git a/fennel/datasets/__init__.py b/fennel/datasets/__init__.py index 7701ffba7..2c9850aaa 100644 --- a/fennel/datasets/__init__.py +++ b/fennel/datasets/__init__.py @@ -3,6 +3,7 @@ Distinct, Sum, LastK, + FirstK, Min, Max, Average, diff --git a/fennel/datasets/aggregate.py b/fennel/datasets/aggregate.py index b0784c5b8..bef430600 100644 --- a/fennel/datasets/aggregate.py +++ b/fennel/datasets/aggregate.py @@ -264,7 +264,7 @@ class LastK(AggregateType): def to_proto(self): if self.window is None: - raise ValueError("Window must be specified for Distinct") + raise ValueError("Window must be specified for LastK") return spec_proto.PreSpec( last_k=spec_proto.LastK( window=self.window.to_proto(), @@ -280,6 +280,28 @@ def signature(self): return f"lastk_{self.of}_{self.window.signature()}" +class FirstK(AggregateType): + of: str + limit: int + dedup: bool + + def to_proto(self): + if self.window is None: + raise ValueError("Window must be specified for FirstK") + return spec_proto.PreSpec( + first_k=spec_proto.FirstK( + window=self.window.to_proto(), + name=self.into_field, + of=self.of, + limit=self.limit, + dedup=self.dedup, + ) + ) + + def signature(self): + return f"firstk_{self.of}_{self.window.signature()}" + + class Stddev(AggregateType): of: str default: float = -1.0 diff --git a/fennel/datasets/datasets.py b/fennel/datasets/datasets.py index 9689b3656..54b9db3c0 100644 --- a/fennel/datasets/datasets.py +++ b/fennel/datasets/datasets.py @@ -38,6 +38,7 @@ Count, Distinct, LastK, + FirstK, Sum, Min, Max, @@ -691,6 +692,10 @@ def dsschema(self): dtype = fennel_get_optional_inner(dtype) list_type = get_python_type_from_pd(dtype) values[agg.into_field] = List[list_type] # type: ignore + elif isinstance(agg, FirstK): + dtype = input_schema.get_type(agg.of) + list_type = get_python_type_from_pd(dtype) + values[agg.into_field] = List[list_type] # type: ignore elif isinstance(agg, Stddev): values[agg.into_field] = pd.Float64Dtype # type: ignore elif isinstance(agg, Quantile): @@ -2739,6 +2744,10 @@ def visitAggregate(self, obj) -> DSSchema: dtype = fennel_get_optional_inner(dtype) list_type = get_python_type_from_pd(dtype) values[agg.into_field] = List[list_type] # type: ignore + elif isinstance(agg, FirstK): + dtype = input_schema.get_type(agg.of) + list_type = get_python_type_from_pd(dtype) + values[agg.into_field] = List[list_type] # type: ignore elif isinstance(agg, Min): dtype = input_schema.get_type(agg.of) if ( diff --git a/fennel/datasets/test_dataset.py b/fennel/datasets/test_dataset.py index beff4b084..9fc4a8a8c 100644 --- a/fennel/datasets/test_dataset.py +++ b/fennel/datasets/test_dataset.py @@ -19,6 +19,7 @@ Sum, Quantile, ExpDecaySum, + FirstK, ) from fennel.dtypes import Embedding, Window, Continuous, Session from fennel.gen.services_pb2 import SyncRequest @@ -261,6 +262,7 @@ class UserAggregatesDataset: count: int avg_age: float stddev_age: float + countries: List[Optional[str]] @pipeline @inputs(UserInfoDataset) @@ -281,6 +283,13 @@ def create_aggregated_dataset(cls, user_info: Dataset): window=Continuous("forever"), into_field=str(cls.stddev_age), ), + FirstK( + of="country", + limit=3, + dedup=True, + window=Continuous("forever"), + into_field=str(cls.countries), + ), ] ) @@ -301,6 +310,16 @@ def create_aggregated_dataset(cls, user_info: Dataset): {"name": "count", "dtype": {"intType": {}}}, {"name": "avg_age", "dtype": {"doubleType": {}}}, {"name": "stddev_age", "dtype": {"doubleType": {}}}, + { + "name": "countries", + "dtype": { + "arrayType": { + "of": { + "optionalType": {"of": {"stringType": {}}} + } + } + }, + }, ] }, "timestamp": "timestamp", @@ -312,6 +331,7 @@ def create_aggregated_dataset(cls, user_info: Dataset): "count": {}, "gender": {}, "stddev_age": {}, + "countries": {}, "timestamp": {}, }, "pycode": {}, diff --git a/fennel/gen/auth_pb2.py b/fennel/gen/auth_pb2.py index 5fb3361ee..c51bec9ca 100644 --- a/fennel/gen/auth_pb2.py +++ b/fennel/gen/auth_pb2.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: auth.proto +# Protobuf Python Version: 4.25.4 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool diff --git a/fennel/gen/connector_pb2.py b/fennel/gen/connector_pb2.py index 5ae41e6b0..5e921636a 100644 --- a/fennel/gen/connector_pb2.py +++ b/fennel/gen/connector_pb2.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: connector.proto +# Protobuf Python Version: 4.25.4 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool diff --git a/fennel/gen/dataset_pb2.py b/fennel/gen/dataset_pb2.py index 30befc88d..e6176a5a7 100644 --- a/fennel/gen/dataset_pb2.py +++ b/fennel/gen/dataset_pb2.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: dataset.proto +# Protobuf Python Version: 4.25.4 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool diff --git a/fennel/gen/expectations_pb2.py b/fennel/gen/expectations_pb2.py index 67ed27576..a149c87e1 100644 --- a/fennel/gen/expectations_pb2.py +++ b/fennel/gen/expectations_pb2.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: expectations.proto +# Protobuf Python Version: 4.25.4 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool diff --git a/fennel/gen/expr_pb2.py b/fennel/gen/expr_pb2.py index 69df78aa2..85172f5c6 100644 --- a/fennel/gen/expr_pb2.py +++ b/fennel/gen/expr_pb2.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: expr.proto +# Protobuf Python Version: 4.25.4 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool diff --git a/fennel/gen/featureset_pb2.py b/fennel/gen/featureset_pb2.py index 20c5a4ba5..4df083d90 100644 --- a/fennel/gen/featureset_pb2.py +++ b/fennel/gen/featureset_pb2.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: featureset.proto +# Protobuf Python Version: 4.25.4 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool diff --git a/fennel/gen/format_pb2.py b/fennel/gen/format_pb2.py index 5e7459a77..a83bd4cd6 100644 --- a/fennel/gen/format_pb2.py +++ b/fennel/gen/format_pb2.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: format.proto +# Protobuf Python Version: 4.25.4 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool diff --git a/fennel/gen/http_auth_pb2.py b/fennel/gen/http_auth_pb2.py index a51d66082..34797b8dc 100644 --- a/fennel/gen/http_auth_pb2.py +++ b/fennel/gen/http_auth_pb2.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: http_auth.proto +# Protobuf Python Version: 4.25.4 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool diff --git a/fennel/gen/index_pb2.py b/fennel/gen/index_pb2.py index 9e3863f89..d7cfdffba 100644 --- a/fennel/gen/index_pb2.py +++ b/fennel/gen/index_pb2.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: index.proto +# Protobuf Python Version: 4.25.4 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool diff --git a/fennel/gen/kinesis_pb2.py b/fennel/gen/kinesis_pb2.py index 520ff7fd2..37fe3cc62 100644 --- a/fennel/gen/kinesis_pb2.py +++ b/fennel/gen/kinesis_pb2.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: kinesis.proto +# Protobuf Python Version: 4.25.4 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool diff --git a/fennel/gen/metadata_pb2.py b/fennel/gen/metadata_pb2.py index d9d355cc9..82b9ffc47 100644 --- a/fennel/gen/metadata_pb2.py +++ b/fennel/gen/metadata_pb2.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: metadata.proto +# Protobuf Python Version: 4.25.4 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool diff --git a/fennel/gen/pycode_pb2.py b/fennel/gen/pycode_pb2.py index d17598f79..d53c3edee 100644 --- a/fennel/gen/pycode_pb2.py +++ b/fennel/gen/pycode_pb2.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: pycode.proto +# Protobuf Python Version: 4.25.4 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool diff --git a/fennel/gen/schema_pb2.py b/fennel/gen/schema_pb2.py index e8d3e1501..4f56afb6f 100644 --- a/fennel/gen/schema_pb2.py +++ b/fennel/gen/schema_pb2.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: schema.proto +# Protobuf Python Version: 4.25.4 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool diff --git a/fennel/gen/schema_registry_pb2.py b/fennel/gen/schema_registry_pb2.py index 5bfb5b171..f1248b253 100644 --- a/fennel/gen/schema_registry_pb2.py +++ b/fennel/gen/schema_registry_pb2.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: schema_registry.proto +# Protobuf Python Version: 4.25.4 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool diff --git a/fennel/gen/services_pb2.py b/fennel/gen/services_pb2.py index 00770b71a..0bf595aed 100644 --- a/fennel/gen/services_pb2.py +++ b/fennel/gen/services_pb2.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: services.proto +# Protobuf Python Version: 4.25.4 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool diff --git a/fennel/gen/spec_pb2.py b/fennel/gen/spec_pb2.py index 826acaf17..72220935f 100644 --- a/fennel/gen/spec_pb2.py +++ b/fennel/gen/spec_pb2.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: spec.proto +# Protobuf Python Version: 4.25.4 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool @@ -14,7 +15,7 @@ import fennel.gen.window_pb2 as window__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\nspec.proto\x12\x11\x66\x65nnel.proto.spec\x1a\x0cwindow.proto\"\xe1\x03\n\x07PreSpec\x12%\n\x03sum\x18\x01 \x01(\x0b\x32\x16.fennel.proto.spec.SumH\x00\x12-\n\x07\x61verage\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.spec.AverageH\x00\x12)\n\x05\x63ount\x18\x03 \x01(\x0b\x32\x18.fennel.proto.spec.CountH\x00\x12*\n\x06last_k\x18\x04 \x01(\x0b\x32\x18.fennel.proto.spec.LastKH\x00\x12%\n\x03min\x18\x05 \x01(\x0b\x32\x16.fennel.proto.spec.MinH\x00\x12%\n\x03max\x18\x06 \x01(\x0b\x32\x16.fennel.proto.spec.MaxH\x00\x12+\n\x06stddev\x18\x07 \x01(\x0b\x32\x19.fennel.proto.spec.StddevH\x00\x12/\n\x08\x64istinct\x18\x08 \x01(\x0b\x32\x1b.fennel.proto.spec.DistinctH\x00\x12/\n\x08quantile\x18\t \x01(\x0b\x32\x1b.fennel.proto.spec.QuantileH\x00\x12\x41\n\texp_decay\x18\n \x01(\x0b\x32,.fennel.proto.spec.ExponentialDecayAggregateH\x00\x42\t\n\x07variant\"L\n\x03Sum\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\"a\n\x07\x41verage\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"\x80\x01\n\x05\x43ount\x12\x0c\n\x04name\x18\x01 \x01(\t\x12+\n\x06window\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0e\n\x06unique\x18\x03 \x01(\x08\x12\x0e\n\x06\x61pprox\x18\x04 \x01(\x08\x12\n\n\x02of\x18\x05 \x01(\t\x12\x10\n\x08\x64ropnull\x18\x06 \x01(\x08\"~\n\x05LastK\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05limit\x18\x03 \x01(\r\x12\r\n\x05\x64\x65\x64up\x18\x04 \x01(\x08\x12+\n\x06window\x18\x05 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x10\n\x08\x64ropnull\x18\x06 \x01(\x08\"]\n\x03Min\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"]\n\x03Max\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"`\n\x06Stddev\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"c\n\x08\x44istinct\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x10\n\x08\x64ropnull\x18\x04 \x01(\x08\"\x95\x01\n\x08Quantile\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x14\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01H\x00\x88\x01\x01\x12\x10\n\x08quantile\x18\x05 \x01(\x01\x12\x0e\n\x06\x61pprox\x18\x06 \x01(\x08\x42\n\n\x08_default\"}\n\x19\x45xponentialDecayAggregate\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x19\n\x11half_life_seconds\x18\x04 \x01(\rb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\nspec.proto\x12\x11\x66\x65nnel.proto.spec\x1a\x0cwindow.proto\"\x8f\x04\n\x07PreSpec\x12%\n\x03sum\x18\x01 \x01(\x0b\x32\x16.fennel.proto.spec.SumH\x00\x12-\n\x07\x61verage\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.spec.AverageH\x00\x12)\n\x05\x63ount\x18\x03 \x01(\x0b\x32\x18.fennel.proto.spec.CountH\x00\x12*\n\x06last_k\x18\x04 \x01(\x0b\x32\x18.fennel.proto.spec.LastKH\x00\x12%\n\x03min\x18\x05 \x01(\x0b\x32\x16.fennel.proto.spec.MinH\x00\x12%\n\x03max\x18\x06 \x01(\x0b\x32\x16.fennel.proto.spec.MaxH\x00\x12+\n\x06stddev\x18\x07 \x01(\x0b\x32\x19.fennel.proto.spec.StddevH\x00\x12/\n\x08\x64istinct\x18\x08 \x01(\x0b\x32\x1b.fennel.proto.spec.DistinctH\x00\x12/\n\x08quantile\x18\t \x01(\x0b\x32\x1b.fennel.proto.spec.QuantileH\x00\x12\x41\n\texp_decay\x18\n \x01(\x0b\x32,.fennel.proto.spec.ExponentialDecayAggregateH\x00\x12,\n\x07\x66irst_k\x18\x0b \x01(\x0b\x32\x19.fennel.proto.spec.FirstKH\x00\x42\t\n\x07variant\"L\n\x03Sum\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\"a\n\x07\x41verage\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"\x80\x01\n\x05\x43ount\x12\x0c\n\x04name\x18\x01 \x01(\t\x12+\n\x06window\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0e\n\x06unique\x18\x03 \x01(\x08\x12\x0e\n\x06\x61pprox\x18\x04 \x01(\x08\x12\n\n\x02of\x18\x05 \x01(\t\x12\x10\n\x08\x64ropnull\x18\x06 \x01(\x08\"~\n\x05LastK\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05limit\x18\x03 \x01(\r\x12\r\n\x05\x64\x65\x64up\x18\x04 \x01(\x08\x12+\n\x06window\x18\x05 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x10\n\x08\x64ropnull\x18\x06 \x01(\x08\"m\n\x06\x46irstK\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05limit\x18\x03 \x01(\r\x12\r\n\x05\x64\x65\x64up\x18\x04 \x01(\x08\x12+\n\x06window\x18\x05 \x01(\x0b\x32\x1b.fennel.proto.window.Window\"]\n\x03Min\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"]\n\x03Max\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"`\n\x06Stddev\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"c\n\x08\x44istinct\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x10\n\x08\x64ropnull\x18\x04 \x01(\x08\"\x95\x01\n\x08Quantile\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x14\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01H\x00\x88\x01\x01\x12\x10\n\x08quantile\x18\x05 \x01(\x01\x12\x0e\n\x06\x61pprox\x18\x06 \x01(\x08\x42\n\n\x08_default\"}\n\x19\x45xponentialDecayAggregate\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x19\n\x11half_life_seconds\x18\x04 \x01(\rb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -22,25 +23,27 @@ if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None _globals['_PRESPEC']._serialized_start=48 - _globals['_PRESPEC']._serialized_end=529 - _globals['_SUM']._serialized_start=531 - _globals['_SUM']._serialized_end=607 - _globals['_AVERAGE']._serialized_start=609 - _globals['_AVERAGE']._serialized_end=706 - _globals['_COUNT']._serialized_start=709 - _globals['_COUNT']._serialized_end=837 - _globals['_LASTK']._serialized_start=839 - _globals['_LASTK']._serialized_end=965 - _globals['_MIN']._serialized_start=967 - _globals['_MIN']._serialized_end=1060 - _globals['_MAX']._serialized_start=1062 - _globals['_MAX']._serialized_end=1155 - _globals['_STDDEV']._serialized_start=1157 - _globals['_STDDEV']._serialized_end=1253 - _globals['_DISTINCT']._serialized_start=1255 - _globals['_DISTINCT']._serialized_end=1354 - _globals['_QUANTILE']._serialized_start=1357 - _globals['_QUANTILE']._serialized_end=1506 - _globals['_EXPONENTIALDECAYAGGREGATE']._serialized_start=1508 - _globals['_EXPONENTIALDECAYAGGREGATE']._serialized_end=1633 + _globals['_PRESPEC']._serialized_end=575 + _globals['_SUM']._serialized_start=577 + _globals['_SUM']._serialized_end=653 + _globals['_AVERAGE']._serialized_start=655 + _globals['_AVERAGE']._serialized_end=752 + _globals['_COUNT']._serialized_start=755 + _globals['_COUNT']._serialized_end=883 + _globals['_LASTK']._serialized_start=885 + _globals['_LASTK']._serialized_end=1011 + _globals['_FIRSTK']._serialized_start=1013 + _globals['_FIRSTK']._serialized_end=1122 + _globals['_MIN']._serialized_start=1124 + _globals['_MIN']._serialized_end=1217 + _globals['_MAX']._serialized_start=1219 + _globals['_MAX']._serialized_end=1312 + _globals['_STDDEV']._serialized_start=1314 + _globals['_STDDEV']._serialized_end=1410 + _globals['_DISTINCT']._serialized_start=1412 + _globals['_DISTINCT']._serialized_end=1511 + _globals['_QUANTILE']._serialized_start=1514 + _globals['_QUANTILE']._serialized_end=1663 + _globals['_EXPONENTIALDECAYAGGREGATE']._serialized_start=1665 + _globals['_EXPONENTIALDECAYAGGREGATE']._serialized_end=1790 # @@protoc_insertion_point(module_scope) diff --git a/fennel/gen/spec_pb2.pyi b/fennel/gen/spec_pb2.pyi index 7ad538ea7..aca068cfe 100644 --- a/fennel/gen/spec_pb2.pyi +++ b/fennel/gen/spec_pb2.pyi @@ -29,6 +29,7 @@ class PreSpec(google.protobuf.message.Message): DISTINCT_FIELD_NUMBER: builtins.int QUANTILE_FIELD_NUMBER: builtins.int EXP_DECAY_FIELD_NUMBER: builtins.int + FIRST_K_FIELD_NUMBER: builtins.int @property def sum(self) -> global___Sum: ... @property @@ -49,6 +50,8 @@ class PreSpec(google.protobuf.message.Message): def quantile(self) -> global___Quantile: ... @property def exp_decay(self) -> global___ExponentialDecayAggregate: ... + @property + def first_k(self) -> global___FirstK: ... def __init__( self, *, @@ -62,10 +65,11 @@ class PreSpec(google.protobuf.message.Message): distinct: global___Distinct | None = ..., quantile: global___Quantile | None = ..., exp_decay: global___ExponentialDecayAggregate | None = ..., + first_k: global___FirstK | None = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["average", b"average", "count", b"count", "distinct", b"distinct", "exp_decay", b"exp_decay", "last_k", b"last_k", "max", b"max", "min", b"min", "quantile", b"quantile", "stddev", b"stddev", "sum", b"sum", "variant", b"variant"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["average", b"average", "count", b"count", "distinct", b"distinct", "exp_decay", b"exp_decay", "last_k", b"last_k", "max", b"max", "min", b"min", "quantile", b"quantile", "stddev", b"stddev", "sum", b"sum", "variant", b"variant"]) -> None: ... - def WhichOneof(self, oneof_group: typing_extensions.Literal["variant", b"variant"]) -> typing_extensions.Literal["sum", "average", "count", "last_k", "min", "max", "stddev", "distinct", "quantile", "exp_decay"] | None: ... + def HasField(self, field_name: typing_extensions.Literal["average", b"average", "count", b"count", "distinct", b"distinct", "exp_decay", b"exp_decay", "first_k", b"first_k", "last_k", b"last_k", "max", b"max", "min", b"min", "quantile", b"quantile", "stddev", b"stddev", "sum", b"sum", "variant", b"variant"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["average", b"average", "count", b"count", "distinct", b"distinct", "exp_decay", b"exp_decay", "first_k", b"first_k", "last_k", b"last_k", "max", b"max", "min", b"min", "quantile", b"quantile", "stddev", b"stddev", "sum", b"sum", "variant", b"variant"]) -> None: ... + def WhichOneof(self, oneof_group: typing_extensions.Literal["variant", b"variant"]) -> typing_extensions.Literal["sum", "average", "count", "last_k", "min", "max", "stddev", "distinct", "quantile", "exp_decay", "first_k"] | None: ... global___PreSpec = PreSpec @@ -182,6 +186,35 @@ class LastK(google.protobuf.message.Message): global___LastK = LastK +@typing_extensions.final +class FirstK(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + OF_FIELD_NUMBER: builtins.int + NAME_FIELD_NUMBER: builtins.int + LIMIT_FIELD_NUMBER: builtins.int + DEDUP_FIELD_NUMBER: builtins.int + WINDOW_FIELD_NUMBER: builtins.int + of: builtins.str + name: builtins.str + limit: builtins.int + dedup: builtins.bool + @property + def window(self) -> window_pb2.Window: ... + def __init__( + self, + *, + of: builtins.str = ..., + name: builtins.str = ..., + limit: builtins.int = ..., + dedup: builtins.bool = ..., + window: window_pb2.Window | None = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["window", b"window"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["dedup", b"dedup", "limit", b"limit", "name", b"name", "of", b"of", "window", b"window"]) -> None: ... + +global___FirstK = FirstK + @typing_extensions.final class Min(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor diff --git a/fennel/gen/status_pb2.py b/fennel/gen/status_pb2.py index cdc144a8b..f24662be1 100644 --- a/fennel/gen/status_pb2.py +++ b/fennel/gen/status_pb2.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: status.proto +# Protobuf Python Version: 4.25.4 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool diff --git a/fennel/gen/window_pb2.py b/fennel/gen/window_pb2.py index ac2bb8d59..6b51a69dd 100644 --- a/fennel/gen/window_pb2.py +++ b/fennel/gen/window_pb2.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: window.proto +# Protobuf Python Version: 4.25.4 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool diff --git a/fennel/lib/aggregate/__init__.py b/fennel/lib/aggregate/__init__.py index 7d04d83b9..7416f55ae 100644 --- a/fennel/lib/aggregate/__init__.py +++ b/fennel/lib/aggregate/__init__.py @@ -4,6 +4,7 @@ Distinct, Sum, LastK, + FirstK, Min, Max, Average, diff --git a/fennel/testing/execute_aggregation.py b/fennel/testing/execute_aggregation.py index e43763971..01de14972 100644 --- a/fennel/testing/execute_aggregation.py +++ b/fennel/testing/execute_aggregation.py @@ -18,6 +18,7 @@ Sum, Average, LastK, + FirstK, Min, Max, Stddev, @@ -184,6 +185,43 @@ def get_val(self): return list(output[: self.k]) +class FirstKState(AggState): + def __init__(self, k, dedup): + self.k = k + self.dedeup = dedup + self.vals = [] + + def add_val_to_state(self, val, _ts): + self.vals.append(val) + if not self.dedeup: + return list(self.vals[: self.k]) + else: + to_ret = [] + for v in self.vals: + if v not in to_ret: + to_ret.append(v) + if len(to_ret) == self.k: + break + return list(to_ret[: self.k]) + + def del_val_from_state(self, val, _ts): + if val in self.vals: + self.vals.remove(val) + if not self.dedeup: + return list(self.vals[: self.k]) + + ret = [] + for v in self.vals: + if v not in ret: + ret.append(v) + if len(ret) == self.k: + break + return list(ret[: self.k]) + + def get_val(self): + return list(self.vals[: self.k]) + + class Heap: def __init__(self, heap_type="min"): self.elements = [] @@ -809,6 +847,8 @@ def get_aggregated_df( state[key] = LastKState( aggregate.limit, aggregate.dedup, aggregate.dropnull ) + elif isinstance(aggregate, FirstK): + state[key] = FirstKState(aggregate.limit, aggregate.dedup) elif isinstance(aggregate, Min): state[key] = MinState(aggregate.default) elif isinstance(aggregate, Max): diff --git a/fennel/testing/executor.py b/fennel/testing/executor.py index 72b4f1cea..2449b4cd8 100644 --- a/fennel/testing/executor.py +++ b/fennel/testing/executor.py @@ -23,6 +23,7 @@ Min, Max, LastK, + FirstK, Quantile, Stddev, ) @@ -347,7 +348,7 @@ def join_aggregated_dataset( final_df[aggregate.into_field] = final_df[ aggregate.into_field ].fillna(0) - if isinstance(aggregate, (LastK, Distinct)): + if isinstance(aggregate, (LastK, FirstK, Distinct)): # final_df[aggregate.into_field] = final_df[ # aggregate.into_field # ].fillna([]) diff --git a/fennel/testing/test_execute_aggregation.py b/fennel/testing/test_execute_aggregation.py index 8bdf47eb7..e00431bf8 100644 --- a/fennel/testing/test_execute_aggregation.py +++ b/fennel/testing/test_execute_aggregation.py @@ -9,6 +9,7 @@ CountState, CountUniqueState, LastKState, + FirstKState, MinState, MaxState, StddevState, @@ -98,6 +99,35 @@ def test_lastk_state_dedup(): assert state.del_val_from_state(4, now) == [1, 2] +def test_firstk_state(): + state = FirstKState(k=3, dedup=False) + now = datetime.now(timezone.utc) + assert state.add_val_to_state(1, now) == [1] + assert state.add_val_to_state(2, now) == [1, 2] + assert state.add_val_to_state(3, now) == [1, 2, 3] + assert state.add_val_to_state(4, now) == [1, 2, 3] + assert state.add_val_to_state(5, now) == [1, 2, 3] + + assert state.del_val_from_state(3, now) == [1, 2, 4] + assert state.del_val_from_state(4, now) == [1, 2, 5] + assert state.del_val_from_state(5, now) == [1, 2] + assert state.del_val_from_state(5, now) == [1, 2] + + +def test_firstk_state_dedup(): + state = FirstKState(k=3, dedup=True) + now = datetime.now(timezone.utc) + assert state.add_val_to_state(1, now) == [1] + assert state.add_val_to_state(2, now) == [1, 2] + assert state.add_val_to_state(1, now) == [1, 2] + assert state.add_val_to_state(3, now) == [1, 2, 3] + assert state.add_val_to_state(4, now) == [1, 2, 3] + assert state.add_val_to_state(1, now) == [1, 2, 3] + + assert state.del_val_from_state(3, now) == [1, 2, 4] + assert state.del_val_from_state(4, now) == [1, 2] + + def test_min_state(): state = MinState(default=3.0) now = datetime.now(timezone.utc) diff --git a/pyproject.toml b/pyproject.toml index c1fe29608..a3dea5f66 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fennel-ai" -version = "1.5.26" +version = "1.5.28" description = "The modern realtime feature engineering platform" authors = ["Fennel AI "] packages = [{ include = "fennel" }]