Skip to content

Commit

Permalink
added support for sampling (#590)
Browse files Browse the repository at this point in the history
* added support for sampling

added 1 more unit test and fixed lint

updated documentation

* updated logic to address comments

* addressed comments

* addressed comments

---------

Co-authored-by: Hemanth Kannekanti <[email protected]>
  • Loading branch information
hemanthk269 and Hemanth Kannekanti authored Oct 22, 2024
1 parent 37568cf commit 546b749
Show file tree
Hide file tree
Showing 10 changed files with 692 additions and 94 deletions.
2 changes: 2 additions & 0 deletions .wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ SSD
SSDs
SSL
SSO
Sample
SearchRequest
ShardIteratorType
Signifier
Expand Down Expand Up @@ -336,6 +337,7 @@ repo
rowset
rowsets
runtime
sample
sasl
scalability
scalable
Expand Down
2 changes: 2 additions & 0 deletions docs/examples/api-reference/sources/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def test_source_decorator(client):
import pandas as pd
from fennel.connectors import source, S3, ref, eval
from fennel.datasets import dataset, field
from fennel.connectors.connectors import Sample

s3 = S3(name="my_s3") # using IAM role based access

Expand All @@ -41,6 +42,7 @@ def test_source_decorator(client):
), # converting age dtype to int
},
env="prod",
sample=Sample(0.2, using=["email"]),
bounded=True,
idleness="1h",
)
Expand Down
15 changes: 15 additions & 0 deletions docs/pages/api-reference/decorators/source.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ can ever arrive. And if such rows do arrive, Fennel has the liberty of discardin
them and not including them in the computation.
</Expandable>

<Expandable title="sample" type='Optional[float] | Optional[Sample]'>
When specifying sampling for a dataset, it can be provided in two ways:
1. **Simply specify the sampling rate** when you want to sample the dataset without specifying the columns used for sampling.
- **Sampling Rate**: A float between 0 and 1 that determines the proportion of the dataset to include.
2. **Use the `Sample` object** when you want to specify both the sampling rate and the specific columns used for sampling.
- **Sampling Rate**: A float between 0 and 1 that determines the proportion of the dataset to include.
- **Using**: A list of columns used to hash for sampling the data. Preproc columns and the timestamp field cannot be included in this list.

Default Behavior When No Columns Are Specified
1. For Keyed Datasets:
All key columns are used for sampling, excluding any preproc columns.
2. For Non-Keyed Datasets:
All columns are used for sampling except for the timestamp and preproc columns.
</Expandable>

<Expandable title="cdc" type='"append" | "native" | "debezium"'>
Specifies how should valid change data be constructed from the ingested data.

Expand Down
36 changes: 36 additions & 0 deletions fennel/connectors/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,21 @@ def preproc_has_indirection(preproc: Optional[Dict[str, PreProcValue]]):
return False


class Sample:
rate: float
using: List[str]

def __init__(self, rate, using):
if rate < 0 or rate > 1:
raise ValueError("Sample rate should be between 0 and 1")
if using is None or len(using) == 0:
raise ValueError(
f"Using must be a non-empty list, try using sample={rate} instead"
)
self.rate = rate
self.using = using


def source(
conn: DataConnector,
disorder: Duration,
Expand All @@ -106,6 +121,7 @@ def source(
bounded: bool = False,
idleness: Optional[Duration] = None,
where: Optional[Callable] = None,
sample: Optional[Union[float, Sample]] = None,
) -> Callable[[T], Any]:
"""
Decorator to specify the source of data for a dataset. The source can be
Expand Down Expand Up @@ -184,6 +200,24 @@ def source(
"idleness parameter should not be passed when bounded is set as False"
)

if sample is not None:
if isinstance(sample, float):
if sample < 0 or sample > 1:
raise ValueError("Sample rate should be between 0 and 1")
elif isinstance(sample, Sample):
disallowed_columns = []
if preproc is not None:
disallowed_columns.extend(list(preproc.keys()))
for column in sample.using:
if column in disallowed_columns:
raise ValueError(
f"Column {column} is part of preproc so cannot be used for sampling"
)
else:
raise ValueError(
"Sample should be either a float or a Sample object"
)

def decorator(dataset_cls: T):
connector = copy.deepcopy(conn)
connector.every = every if every is not None else DEFAULT_EVERY
Expand All @@ -196,6 +230,7 @@ def decorator(dataset_cls: T):
connector.bounded = bounded
connector.idleness = idleness
connector.where = where
connector.sample = sample
connectors = getattr(dataset_cls, SOURCE_FIELD, [])
connectors.append(connector)
setattr(dataset_cls, SOURCE_FIELD, connectors)
Expand Down Expand Up @@ -711,6 +746,7 @@ class DataConnector:
bounded: bool = False
idleness: Optional[Duration] = None
where: Optional[Callable] = None
sample: Optional[Union[float, Sample]] = None
how: Optional[Literal["incremental", "recreate"] | SnapshotData] = None
create: Optional[bool] = None
renames: Optional[Dict[str, str]] = {}
Expand Down
Loading

0 comments on commit 546b749

Please sign in to comment.