From 0ed3dbd9a7aa07643d8f3638453fafbe87a89622 Mon Sep 17 00:00:00 2001 From: Vellanki Sai Harsha Date: Fri, 13 Dec 2024 13:37:50 +0530 Subject: [PATCH] Add support for indirections in preproc ref type for Avro format (#615) --- docs/pages/api-reference/decorators/source.md | 2 +- fennel/CHANGELOG.md | 3 +++ fennel/connectors/connectors.py | 3 ++- fennel/connectors/test_connectors.py | 13 ++++++++++--- fennel/connectors/test_invalid_connectors.py | 14 -------------- pyproject.toml | 2 +- 6 files changed, 17 insertions(+), 20 deletions(-) diff --git a/docs/pages/api-reference/decorators/source.md b/docs/pages/api-reference/decorators/source.md index 675a250e..8bc3be2c 100644 --- a/docs/pages/api-reference/decorators/source.md +++ b/docs/pages/api-reference/decorators/source.md @@ -120,7 +120,7 @@ As of right now, there are three kinds of values of preproc: a constant value. :::info -Fennel supports preproc ref(str) values of type A[B][C] only for the JSON and Protobuf formats, and +Fennel supports preproc ref(str) values of type A[B][C] only for the JSON, Avro and Protobuf formats, and A, B should be struct types. If you have data in other format or require indirection for other parent types apart from struct, please reach out to Fennel support. ::: diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md index 17395a64..632ea51c 100644 --- a/fennel/CHANGELOG.md +++ b/fennel/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## [1.5.60] - 2024-12-10 +- Add support for indirections in preproc ref type for Avro format + ## [1.5.59] - 2024-12-10 - Allow None as default value for min/max/avg/stddev aggregations. diff --git a/fennel/connectors/connectors.py b/fennel/connectors/connectors.py index a5c65c61..f6155c3a 100644 --- a/fennel/connectors/connectors.py +++ b/fennel/connectors/connectors.py @@ -174,9 +174,10 @@ def source( conn.format is not None and conn.format != "json" and not isinstance(conn.format, Protobuf) + and not isinstance(conn.format, Avro) ): raise ValueError( - "Preproc of type ref('A[B][C]') is applicable only for data in JSON and Protobuf formats" + "Preproc of type ref('A[B][C]') is applicable only for data in JSON, Protobuf and Avro formats" ) else: raise ValueError( diff --git a/fennel/connectors/test_connectors.py b/fennel/connectors/test_connectors.py index d4f008b8..7f8829cc 100644 --- a/fennel/connectors/test_connectors.py +++ b/fennel/connectors/test_connectors.py @@ -4130,7 +4130,7 @@ class UserInfoDataset: def test_valid_preproc_value(): - # Preproc value of type A[B][C] can be set for data in JSON and Protobuf formats + # Preproc value of type A[B][C] can be set for data in JSON, Avro Protobuf formats source( s3.bucket( bucket_name="all_ratings", prefix="prod/apac/", format="json" @@ -4151,13 +4151,20 @@ def test_valid_preproc_value(): preproc={"C": "A[B][C]", "D": "A[B][C]"}, ) + avro = Avro( + registry="confluent", + url="http://localhost:8000", + username="user", + password="pwd", + ) source( - kafka.topic(topic="topic", format="Avro"), + kafka.topic(topic="topic", format=avro), every="1h", disorder="14d", cdc="debezium", - preproc={"C": "A[B][C]", "D": "A[B][C]"}, + preproc={"C": ref("A[B][C]"), "D": ref("A[B][C]")}, ) + source( kafka.topic(topic="topic"), every="1h", diff --git a/fennel/connectors/test_invalid_connectors.py b/fennel/connectors/test_invalid_connectors.py index b0037083..4e91cc52 100644 --- a/fennel/connectors/test_invalid_connectors.py +++ b/fennel/connectors/test_invalid_connectors.py @@ -709,20 +709,6 @@ def test_invalid_preproc_value(): == str(e.value) ) - # Preproc value of type A[B][C] cannot be set for data other than JSON and Protobuf formats - with pytest.raises(ValueError) as e: - source( - kafka.topic(topic="topic", format="Avro"), - every="1h", - disorder="14d", - cdc="debezium", - preproc={"C": ref("A[B][C]"), "D": "A[B][C]"}, - ) - assert ( - "Preproc of type ref('A[B][C]') is applicable only for data in JSON and Protobuf formats" - == str(e.value) - ) - # Preproc value of type A[B][C] cannot be set for table sources with pytest.raises(ValueError) as e: source( diff --git a/pyproject.toml b/pyproject.toml index 3111cc18..fc910e0c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fennel-ai" -version = "1.5.59" +version = "1.5.60" description = "The modern realtime feature engineering platform" authors = ["Fennel AI "] packages = [{ include = "fennel" }]