Skip to content

Commit

Permalink
dropnull (#285)
Browse files Browse the repository at this point in the history
operator: add dropnull; improve docs for select, drop
  • Loading branch information
thaqibm authored Oct 16, 2023
1 parent d33f3bc commit 2da2958
Show file tree
Hide file tree
Showing 19 changed files with 583 additions and 121 deletions.
1 change: 1 addition & 0 deletions .wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,4 @@ webhooks
WIP
WIP
YAML
dropnull
6 changes: 5 additions & 1 deletion docs/examples/api-reference/operators_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class UserTransactions:
@pipeline(version=1)
@inputs(Activity)
def create_user_transactions(cls, activity: Dataset):
# docsnip dropnull
dropnull_amounts = activity.dropnull("amount")
# /docsnip

# docsnip transform
def extract_info(df: pd.DataFrame) -> pd.DataFrame:
df_json = df["metadata"].apply(json.loads).apply(pd.Series)
Expand All @@ -55,7 +59,7 @@ def extract_info(df: pd.DataFrame) -> pd.DataFrame:
["merchant_id", "transaction_amount", "user_id", "timestamp"]
]

transformed_ds = activity.transform(
transformed_ds = dropnull_amounts.transform(
extract_info,
schema={
"transaction_amount": float,
Expand Down
30 changes: 27 additions & 3 deletions docs/pages/api-reference/operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,45 @@ Fennel allows you to drop columns from a dataset using the `drop` operator.

The `drop` operator has the following parameters:

1. `columns: List[str]` - positional argument, that specifies the list of columns to drop from the dataset.
1. `columns: List[str]` - keyword or positional argument, that specifies the list of columns to drop from the dataset.
- You can also use `*args` to pass the column names for added flexibility.

:::info
Fennel does not allow you to drop keys or timestamp columns from a dataset.
:::

<pre snippet="api-reference/operators_ref#drop"></pre>


### Dropnull

Fennel allows you to drop null values from columns of a dataset using the `dropnull` operator.
`dropnull` changes the type of the columns specified from `Optional[T]` to `T`

The `dropnull` operator has the following parameters:

1. `columns: List[str]` - keyword or positional argument, that specifies the list of columns to filter out null values in the dataset.
- You can also use `*args` to pass the column names for added flexibility.

Either one of `*args` or `columns` can be provided as an argument for `dropnull`.
If no arguments are given, `columns` will be all fields with the type `Optional[T]` in the dataset.

:::info
Fennel only allows you to call dropnull of columns with Optional type.
:::

<pre snippet="api-reference/operators_ref#dropnull"></pre>

### Select

Fennel allows you to drop columns from a dataset using the `select` operator.
Fennel allows you to select columns from a dataset using the `select` operator.

The `select` operator has the following parameters:

1. `*args: str` - positional arguments that specify the list of fields to select.
1. `columns: List[str]` - keyword argument, that specifies the list of columns to select from the dataset.
- You can also use `*args` to pass the column names for added flexibility.

Either one of `*args` or `columns` must be provided as an argument for `select`.

:::info
Fennel requires you to select all key fields, with the timestamp column automatically included.
Expand Down
89 changes: 87 additions & 2 deletions fennel/client_tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,31 @@ def get_info(cls, info: Dataset):
class UserInfoDatasetDerivedSelect:
user_id: int = field(key=True).meta(description="User ID") # type: ignore
name: str = field().meta(description="User name") # type: ignore
country_name: Optional[str]
country_name: str
ts: datetime = field(timestamp=True)

@pipeline(version=1)
@inputs(UserInfoDataset)
def get_info(cls, info: Dataset):
x = info.rename({"country": "country_name", "timestamp": "ts"})
return x.select("user_id", "name", "country_name")
return x.select("user_id", "name", "country_name").dropnull(
"country_name"
)


@meta(owner="[email protected]")
@dataset
class UserInfoDatasetDerivedDropnull:
user_id: int = field(key=True).meta(description="User ID") # type: ignore
name: str = field().meta(description="User name") # type: ignore
country: str
age: int
timestamp: datetime = field(timestamp=True)

@pipeline(version=1)
@inputs(UserInfoDataset)
def get_info(cls, info: Dataset):
return info.dropnull()


class TestDataset(unittest.TestCase):
Expand Down Expand Up @@ -103,6 +120,9 @@ def test_simple_select_rename(self, client):
data = [
[18232, "Ross", 32, "USA", now],
[18234, "Monica", 24, "Chile", yesterday],
[18235, "Jessica", 24, None, yesterday],
[18236, "Jane", 24, None, yesterday],
[18237, "Chandler", 24, None, yesterday],
]
columns = ["user_id", "name", "age", "country", "timestamp"]
df = pd.DataFrame(data, columns=columns)
Expand Down Expand Up @@ -134,6 +154,71 @@ def test_simple_select_rename(self, client):
assert all(x in df.columns for x in ["user_id", "name", "country_name"])
assert "age" not in df.columns

@pytest.mark.integration
@mock
def test_simple_drop_null(self, client):
# Sync the dataset
client.sync(datasets=[UserInfoDataset, UserInfoDatasetDerivedDropnull])
now = datetime.now()
data = [
[18232, "Ross", 32, "USA", now],
[18234, "Monica", None, "Chile", now],
[18235, "Jessica", 24, None, now],
[18236, "Jane", 24, None, now],
[18237, "Chandler", 24, None, now],
[18238, "Mike", 32, "UK", now],
]
columns = ["user_id", "name", "age", "country", "timestamp"]
df = pd.DataFrame(data, columns=columns)
df["age"] = df["age"].astype(pd.Int64Dtype())
response = client.log("fennel_webhook", "UserInfoDataset", df)
assert response.status_code == requests.codes.OK, response.json()

# Do lookup on UserInfoDataset
if client.is_integration_client():
client.sleep()
ts = pd.Series([now, now, now, now, now, now])
user_id_keys = pd.Series([18232, 18234, 18235, 18236, 18237, 18238])

df, found = UserInfoDataset.lookup(ts, user_id=user_id_keys)
assert df.shape == (6, 5)
assert df["user_id"].tolist() == [
18232,
18234,
18235,
18236,
18237,
18238,
]
assert df["name"].tolist() == [
"Ross",
"Monica",
"Jessica",
"Jane",
"Chandler",
"Mike",
]
assert df["age"].tolist() == [32, None, 24, 24, 24, 32]
assert df["country"].tolist() == [
"USA",
"Chile",
None,
None,
None,
"UK",
]

# Do lookup on UserInfoDatasetDerived
df, found = UserInfoDatasetDerivedDropnull.lookup(
ts, user_id=user_id_keys
)

# TODO (thaqib)
assert df.shape == (6, 5)
assert df["user_id"].tolist() == user_id_keys.tolist()
assert df["name"].tolist() == ["Ross", None, None, None, None, "Mike"]
assert df["country"].tolist() == ["USA", None, None, None, None, "UK"]

@pytest.mark.integration
@mock
def test_simple_drop_rename(self, client):
Expand Down
99 changes: 86 additions & 13 deletions fennel/datasets/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import inspect
import sys
from dataclasses import dataclass
import typing

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -57,6 +58,7 @@
FENNEL_STRUCT_SRC_CODE,
FENNEL_STRUCT_DEPENDENCIES_SRC_CODE,
)

from fennel.sources.sources import DataConnector, source
from fennel.utils import (
fhash,
Expand Down Expand Up @@ -119,19 +121,7 @@ def meta(self, **kwargs: Any) -> T: # type: ignore
return f

def is_optional(self) -> bool:
def _get_origin(type_: Any) -> Any:
return getattr(type_, "__origin__", None)

def _get_args(type_: Any) -> Any:
return getattr(type_, "__args__", None)

if (
_get_origin(self.dtype) is Union
and type(None) is _get_args(self.dtype)[1]
):
return True

return False
return fennel_is_optional(self.dtype)

def fqn(self) -> str:
return f"{self.dataset_name}.{self.name}"
Expand Down Expand Up @@ -241,6 +231,16 @@ def drop(self, *args, columns: Optional[List[str]] = None) -> _Node:
drop_cols = _Node.__get_drop_args(*args, columns=columns)
return self.__drop(drop_cols)

def dropnull(self, *args, columns: Optional[List[str]] = None) -> _Node:
cols = None
if len(args) == 0 and columns is None: # dropnull with no get_args
cols = self.dsschema().get_optional_cols()
else:
cols = _Node.__get_drop_args(
*args, columns=columns, name="dropnull"
)
return DropNull(self, cols)

def select(self, *args, columns: Optional[List[str]] = None) -> _Node:
cols = _Node.__get_drop_args(*args, columns=columns, name="select")
ts = self.dsschema().timestamp
Expand Down Expand Up @@ -693,6 +693,23 @@ def name(self):
return self.__name


class DropNull(_Node):
def __init__(self, node: _Node, columns: List[str]):
super().__init__()
self.node = node
self.columns = columns
self.node.out_edges.append(self)

def signature(self):
return fhash(self.node.signature(), self.columns)

def dsschema(self):
input_schema = copy.deepcopy(self.node.dsschema())
for field in self.columns:
input_schema.drop_null_column(field)
return input_schema


# ---------------------------------------------------------------------
# dataset & pipeline decorators
# ---------------------------------------------------------------------
Expand Down Expand Up @@ -893,6 +910,17 @@ def wrap(c: Type[T]) -> Dataset:
return wrap(cls)


def fennel_is_optional(type_):
return (
typing.get_origin(type_) is Union
and type(None) is typing.get_args(type_)[1]
)


def fennel_get_optional_inner(type_):
return typing.get_args(type_)[0]


# Fennel implementation of get_type_hints which does not error on forward
# references not being types such as Embedding[4].
def f_get_type_hints(obj):
Expand Down Expand Up @@ -1456,6 +1484,8 @@ def visit(self, obj):
return self.visitExplode(obj)
elif isinstance(obj, First):
return self.visitFirst(obj)
elif isinstance(obj, DropNull):
return self.visitDropNull(obj)
elif isinstance(obj, Assign):
return self.visitAssign(obj)
else:
Expand Down Expand Up @@ -1497,6 +1527,9 @@ def visitExplode(self, obj):
def visitFirst(self, obj):
raise NotImplementedError()

def visitDropNull(self, obj):
raise NotImplementedError()

def visitAssign(self, obj):
raise NotImplementedError()

Expand Down Expand Up @@ -1540,6 +1573,23 @@ def rename_column(self, old_name: str, new_name: str):
f"field {old_name} not found in schema of {self.name}"
)

def get_optional_cols(self) -> List[str]:
return [
col for col, t in self.schema().items() if fennel_is_optional(t)
]

def drop_null_column(self, name: str):
if name in self.keys:
self.keys[name] = fennel_get_optional_inner(self.keys[name])
elif name in self.values:
self.values[name] = fennel_get_optional_inner(self.values[name])
elif name == self.timestamp:
raise Exception(
f"cannot drop_null on timestamp field {name} of {self.name}"
)
else:
raise Exception(f"field {name} not found in schema of {self.name}")

def append_value_column(self, name: str, type_: Type):
if name in self.keys:
raise Exception(
Expand Down Expand Up @@ -1956,6 +2006,29 @@ def visitDrop(self, obj) -> DSSchema:
output_schema.name = output_schema_name
return output_schema

def visitDropNull(self, obj):
input_schema = copy.deepcopy(self.visit(obj.node))
output_schema_name = f"'[Pipeline:{self.pipeline_name}]->dropnull node'"
if obj.columns is None or len(obj.columns) == 0:
raise ValueError(
f"invalid dropnull - {output_schema_name} must have at least one column"
)
for field in obj.columns:
if (
field not in input_schema.schema()
or field == input_schema.timestamp
):
raise ValueError(
f"invalid dropnull column {field} not present in {input_schema.name}"
)
if not fennel_is_optional(input_schema.get_type(field)):
raise ValueError(
f"invalid dropnull {field} has type {input_schema.get_type(field)} expected Optional type"
)
output_schema = obj.dsschema()
output_schema.name = output_schema_name
return output_schema

def visitDedup(self, obj) -> DSSchema:
input_schema = self.visit(obj.node)
output_schema_name = (
Expand Down
Loading

0 comments on commit 2da2958

Please sign in to comment.