Skip to content

Commit

Permalink
Add support for indirections in preproc ref type for Avro format
Browse files Browse the repository at this point in the history
  • Loading branch information
saiharshavellanki committed Dec 13, 2024
1 parent 130d240 commit d756b3e
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 20 deletions.
2 changes: 1 addition & 1 deletion docs/pages/api-reference/decorators/source.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ As of right now, there are three kinds of values of preproc:
a constant value.

:::info
Fennel supports preproc ref(str) values of type A[B][C] only for the JSON and Protobuf formats, and
Fennel supports preproc ref(str) values of type A[B][C] only for the JSON, Avro and Protobuf formats, and
A, B should be struct types. If you have data in other format or require indirection
for other parent types apart from struct, please reach out to Fennel support.
:::
Expand Down
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [1.5.60] - 2024-12-10
- Add support for indirections in preproc ref type for Avro format

## [1.5.59] - 2024-12-10
- Allow None as default value for min/max/avg/stddev aggregations.

Expand Down
3 changes: 2 additions & 1 deletion fennel/connectors/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,10 @@ def source(
conn.format is not None
and conn.format != "json"
and not isinstance(conn.format, Protobuf)
and not isinstance(conn.format, Avro)
):
raise ValueError(
"Preproc of type ref('A[B][C]') is applicable only for data in JSON and Protobuf formats"
"Preproc of type ref('A[B][C]') is applicable only for data in JSON, Protobuf and Avro formats"
)
else:
raise ValueError(
Expand Down
13 changes: 10 additions & 3 deletions fennel/connectors/test_connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4130,7 +4130,7 @@ class UserInfoDataset:


def test_valid_preproc_value():
# Preproc value of type A[B][C] can be set for data in JSON and Protobuf formats
# Preproc value of type A[B][C] can be set for data in JSON, Avro Protobuf formats
source(
s3.bucket(
bucket_name="all_ratings", prefix="prod/apac/", format="json"
Expand All @@ -4151,13 +4151,20 @@ def test_valid_preproc_value():
preproc={"C": "A[B][C]", "D": "A[B][C]"},
)

avro = Avro(
registry="confluent",
url="http://localhost:8000",
username="user",
password="pwd",
)
source(
kafka.topic(topic="topic", format="Avro"),
kafka.topic(topic="topic", format=avro),
every="1h",
disorder="14d",
cdc="debezium",
preproc={"C": "A[B][C]", "D": "A[B][C]"},
preproc={"C": ref("A[B][C]"), "D": ref("A[B][C]")},
)

source(
kafka.topic(topic="topic"),
every="1h",
Expand Down
14 changes: 0 additions & 14 deletions fennel/connectors/test_invalid_connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,20 +709,6 @@ def test_invalid_preproc_value():
== str(e.value)
)

# Preproc value of type A[B][C] cannot be set for data other than JSON and Protobuf formats
with pytest.raises(ValueError) as e:
source(
kafka.topic(topic="topic", format="Avro"),
every="1h",
disorder="14d",
cdc="debezium",
preproc={"C": ref("A[B][C]"), "D": "A[B][C]"},
)
assert (
"Preproc of type ref('A[B][C]') is applicable only for data in JSON and Protobuf formats"
== str(e.value)
)

# Preproc value of type A[B][C] cannot be set for table sources
with pytest.raises(ValueError) as e:
source(
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "fennel-ai"
version = "1.5.59"
version = "1.5.60"
description = "The modern realtime feature engineering platform"
authors = ["Fennel AI <[email protected]>"]
packages = [{ include = "fennel" }]
Expand Down

0 comments on commit d756b3e

Please sign in to comment.