Skip to content

Commit

Permalink
Assign in preproc
Browse files Browse the repository at this point in the history
  • Loading branch information
nonibansal committed Sep 2, 2024
1 parent ba94c86 commit 17030de
Show file tree
Hide file tree
Showing 25 changed files with 1,019 additions and 384 deletions.
62 changes: 35 additions & 27 deletions docs/examples/api-reference/sources/source.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import sys
from datetime import datetime

from fennel.testing import mock
Expand All @@ -14,36 +15,43 @@ def test_source_decorator(client):
os.environ["SCHEMA_REGISTRY_USERNAME"] = "test"
os.environ["SCHEMA_REGISTRY_PASSWORD"] = "test"
# docsnip source_decorator
from fennel.connectors import source, S3, ref
import pandas as pd
from fennel.connectors import source, S3, ref, eval
from fennel.datasets import dataset, field

s3 = S3(name="my_s3") # using IAM role based access

bucket = s3.bucket("data", path="user/*/date-%Y-%m-%d/*", format="parquet")

# docsnip-highlight start
@source(
bucket,
every="1h",
cdc="upsert",
disorder="2d",
since=datetime(2021, 1, 1, 3, 30, 0), # 3:30 AM on 1st Jan 2021
until=datetime(2022, 1, 1, 0, 0, 0), # 12:00 AM on 1st Jan 2022
preproc={
"uid": ref("user_id"), # 'uid' comes from column 'user_id'
"country": "USA", # country for every row should become 'USA'
},
env="prod",
bounded=True,
idleness="1h",
)
# docsnip-highlight end
@dataset
class User:
uid: int = field(key=True)
email: str
country: str
timestamp: datetime

# /docsnip
client.commit(message="some commit msg", datasets=[User])
if sys.version_info >= (3, 10):
# docsnip-highlight start
@source(
bucket,
every="1h",
cdc="upsert",
disorder="2d",
since=datetime(2021, 1, 1, 3, 30, 0), # 3:30 AM on 1st Jan 2021
until=datetime(2022, 1, 1, 0, 0, 0), # 12:00 AM on 1st Jan 2022
preproc={
"uid": ref("user_id"), # 'uid' comes from column 'user_id'
"country": "USA", # country for every row should become 'USA'
"age": eval(
lambda x: pd.to_numeric(x["age"]).astype(int),
schema={"age": str},
), # converting age dtype to int
},
env="prod",
bounded=True,
idleness="1h",
)
# docsnip-highlight end
@dataset
class User:
uid: int = field(key=True)
email: str
country: str
age: int
timestamp: datetime

# /docsnip
client.commit(message="some commit msg", datasets=[User])
8 changes: 7 additions & 1 deletion docs/pages/api-reference/decorators/source.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,19 @@ them to commit depending on the environment.
When present, specifies the preproc behavior for the columns referred to by the
keys of the dictionary.

As of right now, there are two kinds of values of preproc:
As of right now, there are three kinds of values of preproc:
* `ref: Ref`: written as `ref(str)` and means that the column denoted
by the key of this value is aliased to another column in the sourced data. This
is useful, for instance, when you want to rename columns while bringing them
to Fennel. With this, you can also perform indirections of kind A[B][C] and
rename them while bringing to fennel.

* `eval: Eval`: written as `eval(Callable | Expr, schema: Dict[str, Type])` and means that the column denoted
by the key of this value computed either through python callable or rust expressions. This
is useful, for instance, when you want to change dtype of a column, add a new column using another column
or fill nulls in the columns with some value to Fennel. The schema parameter is used to specify schema of
columns which is needed for evaluation but not present in dataset.

* `Any`: means that the column denoted by the key of this value should be given
a constant value.

Expand Down
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [1.5.16] - 2024-08-29
- Add assign in preproc.

## [1.5.15] - 2024-09-01
- Allow version to be used as field in dataset

Expand Down
154 changes: 154 additions & 0 deletions fennel/client_tests/test_preproc_assign.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import sys
from datetime import datetime, timezone
from typing import Optional

import pandas as pd
import pytest

from fennel.connectors import Webhook, source, eval
from fennel.datasets import (
dataset,
field,
)
from fennel.expr import col, lit
from fennel.testing import mock

__owner__ = "[email protected]"
webhook = Webhook(name="fennel_webhook")


@pytest.mark.integration
@mock
def test_simple_preproc_assign(client):
if sys.version_info >= (3, 10):

@source(
webhook.endpoint("A1"),
cdc="upsert",
disorder="14d",
preproc={
"age": eval(
col("buggy_age") + lit(1), schema={"buggy_age": int}
),
"height": eval(
lambda x: x["height_str"].apply(lambda y: int(y)),
schema={"height_str": str},
),
},
)
@dataset(index=True)
class A1:
user_id: int = field(key=True)
age: int
height: int
t: datetime

client.commit(datasets=[A1], message="first_commit")

now = datetime.now(timezone.utc)
df = pd.DataFrame(
{
"user_id": [1, 2, 3, 4, 5],
"buggy_age": [10, 11, 12, 13, 14],
"height_str": ["10", "11", "12", "13", "14"],
"t": [now, now, now, now, now],
}
)

client.log("fennel_webhook", "A1", df)
client.sleep(60)

df, _ = client.lookup(
A1,
keys=pd.DataFrame({"user_id": [1, 2, 3, 4, 5]}),
)
assert df["age"].tolist() == [11, 12, 13, 14, 15]
assert df["height"].tolist() == [10, 11, 12, 13, 14]


@pytest.mark.integration
@mock
def test_remove_null_preproc_assign(client):
if sys.version_info >= (3, 10):

@source(
webhook.endpoint("A1"),
cdc="upsert",
disorder="14d",
preproc={
"user_id": eval(
col("user_id").fillnull(-1).astype(int),
schema={"user_id": Optional[int]},
),
},
where=lambda x: x["user_id"] != -1,
)
@dataset(index=True)
class A1:
user_id: int = field(key=True)
height: int
t: datetime

client.commit(datasets=[A1], message="first_commit")

now = datetime.now(timezone.utc)
df = pd.DataFrame(
{
"user_id": [1, 2, 3, 4, 5, None],
"height": [10, 11, 12, 13, 14, 15],
"t": [now, now, now, now, now, now],
}
)
df["user_id"] = df["user_id"].astype(pd.Int64Dtype())

client.log("fennel_webhook", "A1", df)
client.sleep(60)

df, _ = client.lookup(
A1,
keys=pd.DataFrame({"user_id": [1, 2, 3, 4, 5, -1]}),
)
assert df["height"].tolist() == [10, 11, 12, 13, 14, pd.NA]


@pytest.mark.integration
@mock
def test_change_dtype_preproc_assign(client):
if sys.version_info >= (3, 10):

@source(
webhook.endpoint("A1"),
cdc="upsert",
disorder="14d",
preproc={
"age": eval(
lambda x: pd.to_numeric(x["age"]),
schema={"age": str},
),
},
)
@dataset(index=True)
class A1:
user_id: int = field(key=True)
age: int
t: datetime

client.commit(datasets=[A1], message="first_commit")

now = datetime.now(timezone.utc)
df = pd.DataFrame(
{
"user_id": [1, 2, 3, 4, 5],
"age": ["10", "11", "12", "13", "14"],
"t": [now, now, now, now, now],
}
)

client.log("fennel_webhook", "A1", df)
client.sleep(60)

df, _ = client.lookup(
A1,
keys=pd.DataFrame({"user_id": [1, 2, 3, 4, 5]}),
)
assert df["age"].tolist() == [10, 11, 12, 13, 14]
2 changes: 2 additions & 0 deletions fennel/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,7 @@
ref,
PreProcValue,
at_timestamp,
Eval,
eval,
)
import fennel.connectors.kinesis as kinesis
42 changes: 39 additions & 3 deletions fennel/connectors/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,23 @@
import re
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Callable, List, Optional, TypeVar, Union, Tuple, Dict
from typing import Literal
from typing import (
Any,
Callable,
List,
Optional,
TypeVar,
Union,
Tuple,
Dict,
Type,
Literal,
)

from fennel._vendor.pydantic import BaseModel, Field # type: ignore
from fennel._vendor.pydantic import validator # type: ignore
from fennel.connectors.kinesis import at_timestamp
from fennel.expr.expr import Expr, TypedExpr
from fennel.internal_lib.duration import (
Duration,
)
Expand All @@ -32,11 +43,27 @@ class Ref(BaseModel):
name: str


@dataclass
class Eval:
eval_type: Union[Callable, Expr, TypedExpr]
additional_schema: Optional[Dict[str, Type]] = None

class Config:
arbitrary_types_allowed = True


def ref(ref_name: str) -> PreProcValue:
return Ref(name=ref_name)


PreProcValue = Union[Ref, Any]
def eval(
eval_type: Union[Callable, Expr, TypedExpr],
schema: Optional[Dict[str, Type]] = None,
) -> PreProcValue:
return Eval(eval_type=eval_type, additional_schema=schema)


PreProcValue = Union[Ref, Any, Eval]


def preproc_has_indirection(preproc: Optional[Dict[str, PreProcValue]]):
Expand Down Expand Up @@ -906,3 +933,12 @@ def __init__(self, data_source: DataSource, topic_id: str, format: str):

def identifier(self) -> str:
return f"{self.data_source.identifier()}(topic={self.topic_id}, format={self.format})"


def is_table_source(con: DataConnector) -> bool:
if isinstance(
con,
(KinesisConnector, PubSubConnector, KafkaConnector, WebhookConnector),
):
return False
return True
Loading

0 comments on commit 17030de

Please sign in to comment.