Skip to content

Commit

Permalink
DEV-3032, DEV-3033 Introduce S3 path parameter (#367)
Browse files Browse the repository at this point in the history
  • Loading branch information
zsid60 authored Jan 31, 2024
1 parent 51d9002 commit ea36267
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 40 deletions.
17 changes: 16 additions & 1 deletion docs/examples/api-reference/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ class UserPostgresSourcedDataset:
)


@source(s3.bucket("engagement", prefix="notion"), every="30m")
@source(
s3.bucket(
"engagement", path="notion/*/%Y/%m/%d/hour=%H/*/*", format="json"
),
every="30m",
)
@meta(owner="[email protected]")
@dataset
class UserS3SourcedDataset:
Expand All @@ -70,6 +75,16 @@ class UserS3SourcedDataset:
...


@source(s3.bucket("engagement", prefix="notion"), every="30m")
@meta(owner="[email protected]")
@dataset
class UserS3SourcedDatasetWithPrefix:
uid: int = field(key=True)
email: str
timestamp: datetime
...


# /docsnip

# docsnip snowflake_source
Expand Down
24 changes: 18 additions & 6 deletions docs/pages/api-reference/sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,26 @@ Fennel creates a special role with name prefixed by `FennelDataAccessRole-` in y
The following fields need to be defined on the bucket:

1. **`bucket`** - Name of the S3 bucket where the file(s) exist.
2. **`prefix`** (optional) - By providing a path-like prefix (e.g., `myFolder/thisTable/`) under which all the relevant
files sit, we can optimize finding these in S3. This is optional but recommended if your bucket contains many
folders/files 
3. **`format`** (optional) - The format of the files you'd like to replicate. You can choose between CSV (default),
Avro, Hudi, Parquet, and JSON  
4. **`delimiter`** (optional) - the character delimiting individual cells in the CSV data. The default value is `","`
2. **`path`** (optional) - **At most 1 of `prefix` or `path` should be specified.** A pattern describing the path structure
of the objects in S3. When such structure exists in your bucket, this is highly recommended. From this, Fennel can infer
the partitioning scheme within the buckets, which the system uses to optimize ingestion. The pattern specified is a
path-like string consisting of parts separated by `/`. The valid path parts are:
- a static string (of alphanumeric characters, underscores, hyphens or dots)
- the `*` wild card (note this must be the entire path part: `*/*` is valid but `foo*/` is not)
- a string with a [strftime format specifier](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes) (e.g `yyyymmdd=%Y%m%d`)


For example if your bucket has the structure `orders/country={country}/date={date}/store={store}/{file}.json`, provide the
path `orders/*/date=%Y%m%d/*/*`
3. **`prefix`** (optional) - **At most 1 of `prefix` or `path` should be specified.** By providing a path-like prefix
(e.g., `myFolder/thisTable/`) under which all the relevant files sit, we can optimize finding these in S3.
This is optional but recommended if your bucket contains many folders/files 
4. **`format`** (optional) - The format of the files you'd like to replicate. You can choose between CSV (default),
Avro, Hudi, Parquet, and JSON  
5. **`delimiter`** (optional) - the character delimiting individual cells in the CSV data. The default value is `","`
and if overridden, this can only be a 1-character string. For example, to use tab-delimited data enter `"\t"`.

Here are 2 examples, one using path and the other using prefix
<pre snippet="api-reference/source#s3_source"></pre>


Expand Down
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [0.20.16] - 2024-01-30
- Introduce `path` parameter to the S3 source

## [0.20.15] - 2024-01-30
- Improved casting to timestamp in case of epoch

Expand Down
48 changes: 24 additions & 24 deletions fennel/gen/connector_pb2.py

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion fennel/gen/connector_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -536,13 +536,15 @@ class S3Table(google.protobuf.message.Message):
FORMAT_FIELD_NUMBER: builtins.int
DB_FIELD_NUMBER: builtins.int
PRE_SORTED_FIELD_NUMBER: builtins.int
PATH_SUFFIX_FIELD_NUMBER: builtins.int
bucket: builtins.str
path_prefix: builtins.str
delimiter: builtins.str
format: builtins.str
@property
def db(self) -> global___ExtDatabase: ...
pre_sorted: builtins.bool
path_suffix: builtins.str
def __init__(
self,
*,
Expand All @@ -552,9 +554,10 @@ class S3Table(google.protobuf.message.Message):
format: builtins.str = ...,
db: global___ExtDatabase | None = ...,
pre_sorted: builtins.bool = ...,
path_suffix: builtins.str = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["db", b"db"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["bucket", b"bucket", "db", b"db", "delimiter", b"delimiter", "format", b"format", "path_prefix", b"path_prefix", "pre_sorted", b"pre_sorted"]) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["bucket", b"bucket", "db", b"db", "delimiter", b"delimiter", "format", b"format", "path_prefix", b"path_prefix", "path_suffix", b"path_suffix", "pre_sorted", b"pre_sorted"]) -> None: ...

global___S3Table = S3Table

Expand Down
7 changes: 6 additions & 1 deletion fennel/lib/to_proto/to_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ def _s3_conn_to_source_proto(
ext_db,
bucket=connector.bucket_name,
path_prefix=connector.path_prefix,
path_suffix=connector.path_suffix,
delimiter=connector.delimiter,
format=connector.format,
presorted=connector.presorted,
Expand Down Expand Up @@ -748,14 +749,17 @@ def _s3_to_ext_table_proto(
db: connector_proto.ExtDatabase,
bucket: Optional[str],
path_prefix: Optional[str],
path_suffix: Optional[str],
delimiter: str,
format: str,
presorted: bool,
) -> connector_proto.ExtTable:
if bucket is None:
raise ValueError("bucket must be specified")
if path_prefix is None:
raise ValueError("path_prefix must be specified")
raise ValueError("prefix or path must be specified")
if not path_suffix:
path_suffix = ""

return connector_proto.ExtTable(
s3_table=connector_proto.S3Table(
Expand All @@ -765,6 +769,7 @@ def _s3_to_ext_table_proto(
delimiter=delimiter,
format=format,
pre_sorted=presorted,
path_suffix=path_suffix,
)
)

Expand Down
83 changes: 78 additions & 5 deletions fennel/sources/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
from datetime import datetime
from enum import Enum
import re

from typing import Any, Callable, List, Optional, TypeVar, Union, Tuple, Dict

Expand Down Expand Up @@ -141,7 +142,8 @@ class S3(DataSource):
def bucket(
self,
bucket_name: str,
prefix: str,
prefix: Optional[str] = None,
path: Optional[str] = None,
delimiter: str = ",",
format: str = "csv",
presorted: bool = False,
Expand All @@ -150,13 +152,14 @@ def bucket(
self,
bucket_name,
prefix,
path,
delimiter,
format,
presorted,
)

def required_fields(self) -> List[str]:
return ["bucket", "prefix"]
return ["bucket"]

@staticmethod
def get(name: str) -> S3:
Expand Down Expand Up @@ -340,7 +343,7 @@ class Kinesis(DataSource):
def stream(
self,
stream_arn: str,
init_position: str | at_timestamp,
init_position: str | at_timestamp | datetime | int | float,
format: str,
) -> KinesisConnector:
return KinesisConnector(self, stream_arn, init_position, format)
Expand Down Expand Up @@ -431,7 +434,8 @@ def identifier(self) -> str:

class S3Connector(DataConnector):
bucket_name: Optional[str]
path_prefix: Optional[str]
path_prefix: str
path_suffix: Optional[str] = None
delimiter: str = ","
format: str = "csv"
presorted: bool = False
Expand All @@ -441,13 +445,13 @@ def __init__(
data_source,
bucket_name,
path_prefix,
path,
delimiter,
format,
presorted,
):
self.data_source = data_source
self.bucket_name = bucket_name
self.path_prefix = path_prefix
self.delimiter = delimiter
self.format = format
self.presorted = presorted
Expand All @@ -467,6 +471,17 @@ def __init__(
if self.format == "csv" and self.delimiter not in [",", "\t", "|"]:
raise (ValueError("delimiter must be one of [',', '\t', '|']"))

# Only one of a prefix or path pattern can be specified. If a path is specified,
# the prefix and suffix are parsed and validated from it
if path and path_prefix:
raise AttributeError("path and prefix cannot be specified together")
elif path_prefix:
self.path_prefix = path_prefix
elif path:
self.path_prefix, self.path_suffix = S3Connector.parse_path(path)
else:
raise AttributeError("either path or prefix must be specified")

def identifier(self) -> str:
return (
f"{self.data_source.identifier()}(bucket={self.bucket_name}"
Expand All @@ -479,6 +494,64 @@ def creds(self) -> Tuple[Optional[str], Optional[str]]:
self.data_source.aws_secret_access_key,
)

@staticmethod
def parse_path(path: str) -> Tuple[str, str]:
"""Parse a path of form "foo/bar/date=%Y-%m-%d/*" into a prefix and a suffix"""

ends_with_slash = path.endswith("/")
# Strip out the ending slash
if ends_with_slash:
path = path[:-1]
parts = path.split("/")
suffix_portion = False
prefix = []
suffix = []
for i, part in enumerate(parts):
if part == "*" or part == "**":
# Wildcard
suffix_portion = True
suffix.append(part)
elif "*" in part:
# *.file-extension is allowed in the last path part only
if i != len(parts) - 1:
raise ValueError(
f"Invalid path part {part}. The * wildcard must be a complete path part except for an ending file extension."
)
pattern = r"^\*\.[a-zA-Z0-9.]+$"
if not re.match(pattern, part):
raise ValueError(
f"Invalid path part {part}. The ending path part must be the * wildcard, of the form *.file-extension, a strftime format string, or static string"
)
suffix_portion = True
suffix.append(part)
elif "%" in part:
suffix_portion = True
# ensure we have a valid strftime format specifier
try:
formatted = datetime.now().strftime(part)
datetime.strptime(formatted, part)
suffix.append(part)
except ValueError:
raise ValueError(
f"Invalid path part {part}. Invalid datetime format specifier"
)
else:
pattern = r"^[a-zA-Z0-9._\-]+$"
if not re.match(pattern, part):
raise ValueError(
f"Invalid path part {part}. All path parts must contain alphanumeric characters, hyphens, underscores, dots, the * or ** wildcards, or strftime format specifiers."
)
# static part. Can be part of prefix until we see a wildcard
if suffix_portion:
suffix.append(part)
else:
prefix.append(part)
prefix_str, suffix_str = "/".join(prefix), "/".join(suffix)
# Add the slash to the end of the prefix if it originally had a slash
if len(prefix_str) > 0 and (len(suffix_str) > 0 or ends_with_slash):
prefix_str = prefix_str + "/"
return prefix_str, suffix_str


class KinesisConnector(DataConnector):
def __init__(
Expand Down
32 changes: 31 additions & 1 deletion fennel/sources/test_invalid_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
Kinesis,
)
from fennel.sources.kinesis import at_timestamp
from fennel.sources.sources import BigQuery
from fennel.sources.sources import BigQuery, S3Connector

# noinspection PyUnresolvedReferences
from fennel.test_lib import *
Expand Down Expand Up @@ -327,6 +327,36 @@ def test_invalid_s3_format():
assert "delimiter must be one of" in str(e.value)


def test_invalid_s3_path():
# Exactly one of path and prefix are allowed
with pytest.raises(AttributeError) as e:
s3.bucket(
bucket_name="bucket", prefix="prefix", path="prefix/suffix/*.json"
)
assert "path and prefix cannot be specified together" == str(e.value)

with pytest.raises(AttributeError) as e:
s3.bucket(bucket_name="bucket")
assert "either path or prefix must be specified" == str(e.value)

invalid_path_cases = [
("foo*/bar*/", "* wildcard must be a complete path part"),
("foo/bar*", "of the form *.file-extension"),
("foo/*-file.csv", "of the form *.file-extension"),
("foo$/bar1/*", "alphanumeric characters, hyphens,"),
("date-%q/*", "Invalid datetime"),
("//foo/bar", "alphanumeric characters, hyphens,"),
("*year=%Y/*", "* wildcard must be a complete path part"),
("year=%*/*", "* wildcard must be a complete path part"),
]

for path, error_str in invalid_path_cases:
with pytest.raises(ValueError) as e:
S3Connector.parse_path(path)
assert "Invalid path part" in str(e.value)
assert error_str in str(e.value)


def test_invalid_pre_proc():
@source(
s3.bucket(
Expand Down
Loading

0 comments on commit ea36267

Please sign in to comment.