diff --git a/docs/examples/api-reference/sources/source.py b/docs/examples/api-reference/sources/source.py index d201ccd85..837b41285 100644 --- a/docs/examples/api-reference/sources/source.py +++ b/docs/examples/api-reference/sources/source.py @@ -1,4 +1,5 @@ import os +import sys from datetime import datetime from fennel.testing import mock @@ -14,36 +15,43 @@ def test_source_decorator(client): os.environ["SCHEMA_REGISTRY_USERNAME"] = "test" os.environ["SCHEMA_REGISTRY_PASSWORD"] = "test" # docsnip source_decorator - from fennel.connectors import source, S3, ref + import pandas as pd + from fennel.connectors import source, S3, ref, eval from fennel.datasets import dataset, field s3 = S3(name="my_s3") # using IAM role based access bucket = s3.bucket("data", path="user/*/date-%Y-%m-%d/*", format="parquet") - # docsnip-highlight start - @source( - bucket, - every="1h", - cdc="upsert", - disorder="2d", - since=datetime(2021, 1, 1, 3, 30, 0), # 3:30 AM on 1st Jan 2021 - until=datetime(2022, 1, 1, 0, 0, 0), # 12:00 AM on 1st Jan 2022 - preproc={ - "uid": ref("user_id"), # 'uid' comes from column 'user_id' - "country": "USA", # country for every row should become 'USA' - }, - env="prod", - bounded=True, - idleness="1h", - ) - # docsnip-highlight end - @dataset - class User: - uid: int = field(key=True) - email: str - country: str - timestamp: datetime - - # /docsnip - client.commit(message="some commit msg", datasets=[User]) + if sys.version_info >= (3, 10): + # docsnip-highlight start + @source( + bucket, + every="1h", + cdc="upsert", + disorder="2d", + since=datetime(2021, 1, 1, 3, 30, 0), # 3:30 AM on 1st Jan 2021 + until=datetime(2022, 1, 1, 0, 0, 0), # 12:00 AM on 1st Jan 2022 + preproc={ + "uid": ref("user_id"), # 'uid' comes from column 'user_id' + "country": "USA", # country for every row should become 'USA' + "age": eval( + lambda x: pd.to_numeric(x["age"]).astype(int), + schema={"age": str}, + ), # converting age dtype to int + }, + env="prod", + bounded=True, + idleness="1h", + ) + # docsnip-highlight end + @dataset + class User: + uid: int = field(key=True) + email: str + country: str + age: int + timestamp: datetime + + # /docsnip + client.commit(message="some commit msg", datasets=[User]) diff --git a/docs/pages/api-reference/decorators/source.md b/docs/pages/api-reference/decorators/source.md index 8da65a47d..1978cdc78 100644 --- a/docs/pages/api-reference/decorators/source.md +++ b/docs/pages/api-reference/decorators/source.md @@ -88,13 +88,19 @@ them to commit depending on the environment. When present, specifies the preproc behavior for the columns referred to by the keys of the dictionary. -As of right now, there are two kinds of values of preproc: +As of right now, there are three kinds of values of preproc: * `ref: Ref`: written as `ref(str)` and means that the column denoted by the key of this value is aliased to another column in the sourced data. This is useful, for instance, when you want to rename columns while bringing them to Fennel. With this, you can also perform indirections of kind A[B][C] and rename them while bringing to fennel. +* `eval: Eval`: written as `eval(Callable | Expr, schema: Dict[str, Type])` and means that the column denoted + by the key of this value computed either through python callable or rust expressions. This + is useful, for instance, when you want to change dtype of a column, add a new column using another column + or fill nulls in the columns with some value to Fennel. The schema parameter is used to specify schema of + columns which is needed for evaluation but not present in dataset. + * `Any`: means that the column denoted by the key of this value should be given a constant value. diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md index 7d700f59a..a1b71b6c5 100644 --- a/fennel/CHANGELOG.md +++ b/fennel/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## [1.5.16] - 2024-08-29 +- Add assign in preproc. + ## [1.5.15] - 2024-09-01 - Allow version to be used as field in dataset diff --git a/fennel/client_tests/test_preproc_assign.py b/fennel/client_tests/test_preproc_assign.py new file mode 100644 index 000000000..47f11e0ad --- /dev/null +++ b/fennel/client_tests/test_preproc_assign.py @@ -0,0 +1,154 @@ +import sys +from datetime import datetime, timezone +from typing import Optional + +import pandas as pd +import pytest + +from fennel.connectors import Webhook, source, eval +from fennel.datasets import ( + dataset, + field, +) +from fennel.expr import col, lit +from fennel.testing import mock + +__owner__ = "nitin@fennel.ai" +webhook = Webhook(name="fennel_webhook") + + +@pytest.mark.integration +@mock +def test_simple_preproc_assign(client): + if sys.version_info >= (3, 10): + + @source( + webhook.endpoint("A1"), + cdc="upsert", + disorder="14d", + preproc={ + "age": eval( + col("buggy_age") + lit(1), schema={"buggy_age": int} + ), + "height": eval( + lambda x: x["height_str"].apply(lambda y: int(y)), + schema={"height_str": str}, + ), + }, + ) + @dataset(index=True) + class A1: + user_id: int = field(key=True) + age: int + height: int + t: datetime + + client.commit(datasets=[A1], message="first_commit") + + now = datetime.now(timezone.utc) + df = pd.DataFrame( + { + "user_id": [1, 2, 3, 4, 5], + "buggy_age": [10, 11, 12, 13, 14], + "height_str": ["10", "11", "12", "13", "14"], + "t": [now, now, now, now, now], + } + ) + + client.log("fennel_webhook", "A1", df) + client.sleep(60) + + df, _ = client.lookup( + A1, + keys=pd.DataFrame({"user_id": [1, 2, 3, 4, 5]}), + ) + assert df["age"].tolist() == [11, 12, 13, 14, 15] + assert df["height"].tolist() == [10, 11, 12, 13, 14] + + +@pytest.mark.integration +@mock +def test_remove_null_preproc_assign(client): + if sys.version_info >= (3, 10): + + @source( + webhook.endpoint("A1"), + cdc="upsert", + disorder="14d", + preproc={ + "user_id": eval( + col("user_id").fillnull(-1).astype(int), + schema={"user_id": Optional[int]}, + ), + }, + where=lambda x: x["user_id"] != -1, + ) + @dataset(index=True) + class A1: + user_id: int = field(key=True) + height: int + t: datetime + + client.commit(datasets=[A1], message="first_commit") + + now = datetime.now(timezone.utc) + df = pd.DataFrame( + { + "user_id": [1, 2, 3, 4, 5, None], + "height": [10, 11, 12, 13, 14, 15], + "t": [now, now, now, now, now, now], + } + ) + df["user_id"] = df["user_id"].astype(pd.Int64Dtype()) + + client.log("fennel_webhook", "A1", df) + client.sleep(60) + + df, _ = client.lookup( + A1, + keys=pd.DataFrame({"user_id": [1, 2, 3, 4, 5, -1]}), + ) + assert df["height"].tolist() == [10, 11, 12, 13, 14, pd.NA] + + +@pytest.mark.integration +@mock +def test_change_dtype_preproc_assign(client): + if sys.version_info >= (3, 10): + + @source( + webhook.endpoint("A1"), + cdc="upsert", + disorder="14d", + preproc={ + "age": eval( + lambda x: pd.to_numeric(x["age"]), + schema={"age": str}, + ), + }, + ) + @dataset(index=True) + class A1: + user_id: int = field(key=True) + age: int + t: datetime + + client.commit(datasets=[A1], message="first_commit") + + now = datetime.now(timezone.utc) + df = pd.DataFrame( + { + "user_id": [1, 2, 3, 4, 5], + "age": ["10", "11", "12", "13", "14"], + "t": [now, now, now, now, now], + } + ) + + client.log("fennel_webhook", "A1", df) + client.sleep(60) + + df, _ = client.lookup( + A1, + keys=pd.DataFrame({"user_id": [1, 2, 3, 4, 5]}), + ) + assert df["age"].tolist() == [10, 11, 12, 13, 14] diff --git a/fennel/connectors/__init__.py b/fennel/connectors/__init__.py index 9aa2eca2f..0a3476884 100644 --- a/fennel/connectors/__init__.py +++ b/fennel/connectors/__init__.py @@ -29,5 +29,7 @@ ref, PreProcValue, at_timestamp, + Eval, + eval, ) import fennel.connectors.kinesis as kinesis diff --git a/fennel/connectors/connectors.py b/fennel/connectors/connectors.py index 4d82a2dc9..c414536f0 100644 --- a/fennel/connectors/connectors.py +++ b/fennel/connectors/connectors.py @@ -4,12 +4,23 @@ import re from dataclasses import dataclass from datetime import datetime, timezone -from typing import Any, Callable, List, Optional, TypeVar, Union, Tuple, Dict -from typing import Literal +from typing import ( + Any, + Callable, + List, + Optional, + TypeVar, + Union, + Tuple, + Dict, + Type, + Literal, +) from fennel._vendor.pydantic import BaseModel, Field # type: ignore from fennel._vendor.pydantic import validator # type: ignore from fennel.connectors.kinesis import at_timestamp +from fennel.expr.expr import Expr, TypedExpr from fennel.internal_lib.duration import ( Duration, ) @@ -32,11 +43,27 @@ class Ref(BaseModel): name: str +@dataclass +class Eval: + eval_type: Union[Callable, Expr, TypedExpr] + additional_schema: Optional[Dict[str, Type]] = None + + class Config: + arbitrary_types_allowed = True + + def ref(ref_name: str) -> PreProcValue: return Ref(name=ref_name) -PreProcValue = Union[Ref, Any] +def eval( + eval_type: Union[Callable, Expr, TypedExpr], + schema: Optional[Dict[str, Type]] = None, +) -> PreProcValue: + return Eval(eval_type=eval_type, additional_schema=schema) + + +PreProcValue = Union[Ref, Any, Eval] def preproc_has_indirection(preproc: Optional[Dict[str, PreProcValue]]): @@ -906,3 +933,12 @@ def __init__(self, data_source: DataSource, topic_id: str, format: str): def identifier(self) -> str: return f"{self.data_source.identifier()}(topic={self.topic_id}, format={self.format})" + + +def is_table_source(con: DataConnector) -> bool: + if isinstance( + con, + (KinesisConnector, PubSubConnector, KafkaConnector, WebhookConnector), + ): + return False + return True diff --git a/fennel/connectors/test_connectors.py b/fennel/connectors/test_connectors.py index d8943fb18..96253c1d3 100644 --- a/fennel/connectors/test_connectors.py +++ b/fennel/connectors/test_connectors.py @@ -2,6 +2,7 @@ from datetime import datetime from typing import Optional +import pandas as pd from google.protobuf.json_format import ParseDict # type: ignore import fennel.gen.connector_pb2 as connector_proto @@ -22,9 +23,11 @@ ref, S3Connector, PubSub, + eval, ) from fennel.connectors.connectors import CSV from fennel.datasets import dataset, field, pipeline, Dataset +from fennel.expr import col, lit from fennel.lib import meta from fennel.lib.params import inputs @@ -2365,3 +2368,152 @@ class UserInfoDataset: assert source_request == expected_source_request, error_message( source_request, expected_source_request ) + + +def test_assign_python_preproc(): + if sys.version_info >= (3, 10): + + @source( + mysql.table( + "users", + cursor="added_on", + ), + every="1h", + disorder="20h", + bounded=True, + idleness="1h", + cdc="upsert", + preproc={"age": eval(lambda x: pd.to_numeric(x["age_str"]))}, + ) + @meta(owner="test@test.com") + @dataset + class UserInfoDataset: + user_id: int = field(key=True) + name: str + gender: str + # Users date of birth + dob: str + age: int + age_str: str + timestamp: datetime = field(timestamp=True) + + view = InternalTestClient() + view.add(UserInfoDataset) + sync_request = view._get_sync_request_proto() + + assert len(sync_request.sources) == 1 + source_request = sync_request.sources[0] + s = { + "table": { + "mysqlTable": { + "db": { + "name": "mysql", + "mysql": { + "host": "localhost", + "database": "test", + "user": "root", + "password": "root", + "port": 3306, + }, + }, + "tableName": "users", + } + }, + "dataset": "UserInfoDataset", + "dsVersion": 1, + "every": "3600s", + "cursor": "added_on", + "disorder": "72000s", + "timestampField": "timestamp", + "cdc": "Upsert", + "bounded": True, + "idleness": "3600s", + "preProc": {"age": {"eval": {"pycode": {}}}}, + } + expected_source_request = ParseDict(s, connector_proto.Source()) + source_request.pre_proc.get("age").eval.pycode.Clear() + assert source_request == expected_source_request, error_message( + source_request, expected_source_request + ) + + +def test_assign_eval_preproc(): + + @source( + mysql.table( + "users", + cursor="added_on", + ), + every="1h", + disorder="20h", + bounded=True, + idleness="1h", + cdc="upsert", + preproc={ + "age": eval( + (col("val1") * col("val2") + lit(1)), + schema={"val1": int, "val2": int}, + ) + }, + ) + @meta(owner="test@test.com") + @dataset + class UserInfoDataset: + user_id: int = field(key=True) + name: str + gender: str + # Users date of birth + dob: str + age: int + timestamp: datetime = field(timestamp=True) + + view = InternalTestClient() + view.add(UserInfoDataset) + sync_request = view._get_sync_request_proto() + + assert len(sync_request.sources) == 1 + source_request = sync_request.sources[0] + s = { + "table": { + "mysqlTable": { + "db": { + "name": "mysql", + "mysql": { + "host": "localhost", + "database": "test", + "user": "root", + "password": "root", + "port": 3306, + }, + }, + "tableName": "users", + } + }, + "dataset": "UserInfoDataset", + "dsVersion": 1, + "every": "3600s", + "cursor": "added_on", + "disorder": "72000s", + "timestampField": "timestamp", + "cdc": "Upsert", + "bounded": True, + "idleness": "3600s", + "preProc": { + "age": { + "eval": { + "expr": {}, + "schema": { + "fields": [ + {"name": "val1", "dtype": {"intType": {}}}, + {"name": "val2", "dtype": {"intType": {}}}, + ] + }, + } + } + }, + } + expected_source_request = ParseDict(s, connector_proto.Source()) + source_request.pre_proc.get("age").eval.expr.Clear() + assert source_request == expected_source_request, error_message( + source_request, expected_source_request + ) diff --git a/fennel/connectors/test_invalid_connectors.py b/fennel/connectors/test_invalid_connectors.py index de70b44d1..c60f7a749 100644 --- a/fennel/connectors/test_invalid_connectors.py +++ b/fennel/connectors/test_invalid_connectors.py @@ -2,7 +2,7 @@ from lib2to3.fixes.fix_tuple_params import tuple_name from typing import Optional -from fennel.connectors.connectors import Protobuf, ref +from fennel.connectors.connectors import Protobuf, ref, eval import pytest from fennel.connectors import ( @@ -20,6 +20,7 @@ S3Connector, ) from fennel.datasets import dataset, field +from fennel.expr import col from fennel.lib import meta # noinspection PyUnresolvedReferences @@ -835,3 +836,110 @@ def test_invalid_s3_batch_sink(): "S3 sink only supports data access through Fennel DataAccess IAM Role" == str(e.value) ) + + +@mock +def test_invalid_timestamp_assign_preproc(client): + + @source( + mysql.table( + "users", + cursor="added_on", + ), + every="1h", + disorder="20h", + bounded=True, + idleness="1h", + cdc="upsert", + preproc={"timestamp": eval(col("val1"), schema={"val1": datetime})}, + ) + @meta(owner="test@test.com") + @dataset + class UserInfoDataset: + user_id: int = field(key=True) + name: str + gender: str + # Users date of birth + dob: str + age: int + timestamp: datetime = field(timestamp=True) + + # Setting timestamp field through assign preproc + with pytest.raises(ValueError) as e: + client.commit(datasets=[UserInfoDataset], message="test") + + assert ( + "Dataset `UserInfoDataset` has timestamp field set from assign preproc. Please either ref preproc or set a constant value." + == str(e.value) + ) + + +@mock +def test_invalid_assign_preproc(client): + + @source( + mysql.table( + "users", + cursor="added_on", + ), + every="1h", + disorder="20h", + bounded=True, + idleness="1h", + cdc="upsert", + preproc={"age": eval(col("val1"), schema={"val1": int})}, + ) + @meta(owner="test@test.com") + @dataset + class UserInfoDataset: + user_id: int = field(key=True) + name: str + gender: str + # Users date of birth + dob: str + timestamp: datetime = field(timestamp=True) + + # Setting timestamp field through assign preproc + with pytest.raises(ValueError) as e: + client.commit(datasets=[UserInfoDataset], message="test") + + assert ( + "Dataset `UserInfoDataset` has a source with a pre_proc value field `age`, but the field is not defined in the dataset." + == str(e.value) + ) + + +@mock +def test_invalid_eval_assign_preproc(client): + + @source( + mysql.table( + "users", + cursor="added_on", + ), + every="1h", + disorder="20h", + bounded=True, + idleness="1h", + cdc="upsert", + preproc={"age": eval(col("val1"), schema={"val1": float})}, + ) + @meta(owner="test@test.com") + @dataset + class UserInfoDataset: + user_id: int = field(key=True) + name: str + gender: str + # Users date of birth + dob: str + age: int + timestamp: datetime = field(timestamp=True) + + # Setting timestamp field through assign preproc + with pytest.raises(TypeError) as e: + client.commit(datasets=[UserInfoDataset], message="test") + + assert ( + "`age` is of type `int` in Dataset `UserInfoDataset`, can not be cast to `float`. Full expression: `col('val1')`" + == str(e.value) + ) diff --git a/fennel/datasets/datasets.py b/fennel/datasets/datasets.py index db67ff950..c9b23b39c 100644 --- a/fennel/datasets/datasets.py +++ b/fennel/datasets/datasets.py @@ -2,7 +2,6 @@ import warnings import copy import datetime -import enum import functools import inspect import sys @@ -45,7 +44,6 @@ ExpDecaySum, ) from fennel.dtypes.dtypes import ( - FENNEL_STRUCT, get_fennel_struct, Window, Decimal, diff --git a/fennel/dtypes/dtypes.py b/fennel/dtypes/dtypes.py index 5c27930e0..253d9dbc0 100644 --- a/fennel/dtypes/dtypes.py +++ b/fennel/dtypes/dtypes.py @@ -11,7 +11,6 @@ get_args, ForwardRef, Any, - Optional, ) import google.protobuf.duration_pb2 as duration_proto # type: ignore @@ -19,6 +18,12 @@ import fennel.gen.schema_pb2 as schema_proto import fennel.gen.window_pb2 as window_proto +from fennel.internal_lib import ( + FENNEL_STRUCT, + FENNEL_STRUCT_DEPENDENCIES_SRC_CODE, + FENNEL_STRUCT_SRC_CODE, + META_FIELD, +) from fennel.internal_lib.duration import duration_to_timedelta from fennel.internal_lib.utils.utils import ( get_origin, @@ -26,11 +31,6 @@ as_json, dtype_to_string, ) -from fennel.lib.metadata.metadata import META_FIELD - -FENNEL_STRUCT = "__fennel_struct__" -FENNEL_STRUCT_SRC_CODE = "__fennel_struct_src_code__" -FENNEL_STRUCT_DEPENDENCIES_SRC_CODE = "__fennel_struct_dependencies_src_code__" def _contains_user_defined_class(annotation) -> bool: diff --git a/fennel/expr/expr.py b/fennel/expr/expr.py index bf3e6a627..1c5a3decf 100644 --- a/fennel/expr/expr.py +++ b/fennel/expr/expr.py @@ -1,17 +1,20 @@ from __future__ import annotations -from enum import Enum -from dataclasses import dataclass -from typing import Any, Callable, Dict, Type, Optional import json +from dataclasses import dataclass +from typing import Any, Callable, Dict, Type, Optional -from fennel.dtypes.dtypes import FENNEL_STRUCT import pandas as pd -from fennel.internal_lib.schema.schema import from_proto import pyarrow as pa from fennel_data_lib import eval, type_of -from fennel.internal_lib.schema import get_datatype + import fennel.gen.schema_pb2 as schema_proto +from fennel.internal_lib import FENNEL_STRUCT +from fennel.internal_lib.schema import ( + from_proto, + get_datatype, + cast_col_to_arrow_dtype, +) class InvalidExprException(Exception): @@ -319,63 +322,17 @@ def typeof(self, schema: Dict) -> Type: def eval(self, input_df: pd.DataFrame, schema: Dict) -> pd.Series: from fennel.expr.serializer import ExprSerializer - def convert_object(obj): - if isinstance(obj, list): - result = [convert_object(i) for i in obj] - elif isinstance(obj, dict): - result = {} - for key in obj: - result[key] = convert_object(obj[key]) - elif hasattr(obj, "as_json"): - result = obj.as_json() - else: - result = obj - return result - - def convert_objects(df): - for col in df.columns: - df[col] = df[col].apply(convert_object) - return df - - def pd_to_pa(pd_data, schema=None): - # Schema unspecified - as in the case with lookups - if not schema: - if isinstance(pd_data, pd.Series): - pd_data = pd_data.apply(convert_object) - return pa.Array.from_pandas(pd_data) - elif isinstance(pd_data, pd.DataFrame): - pd_data = convert_objects(pd_data) - return pa.RecordBatch.from_pandas( - pd_data, preserve_index=False + def pd_to_pa(pd_data: pd.DataFrame, schema: Dict[str, Type]): + new_df = pd_data.copy() + for column, dtype in schema.items(): + dtype = get_datatype(dtype) + if column not in new_df.columns: + raise InvalidExprException( + f"column : {column} not found in input dataframe but defined in schema." ) - else: - raise ValueError("only pd.Series or pd.Dataframe expected") - - # Single column expected - if isinstance(schema, pa.Field): - # extra columns may have been provided - if isinstance(pd_data, pd.DataFrame): - if schema.name not in pd_data: - raise ValueError( - f"Dataframe does not contain column {schema.name}" - ) - # df -> series - pd_data = pd_data[schema.name] - - if not isinstance(pd_data, pd.Series): - raise ValueError("only pd.Series or pd.Dataframe expected") - pd_data = pd_data.apply(convert_object) - return pa.Array.from_pandas(pd_data, type=schema.type) - - # Multiple columns case: use the columns we need - result_df = pd.DataFrame() - for col in schema.names: - if col not in pd_data: - raise ValueError(f"Dataframe does not contain column {col}") - result_df[col] = pd_data[col].apply(convert_object) - return pa.RecordBatch.from_pandas( - result_df, preserve_index=False, schema=schema - ) + new_df[column] = cast_col_to_arrow_dtype(new_df[column], dtype) + new_df = new_df.loc[:, list(schema.keys())] + return pa.RecordBatch.from_pandas(new_df, preserve_index=False) def pa_to_pd(pa_data): return pa_data.to_pandas(types_mapper=pd.ArrowDtype) @@ -383,7 +340,7 @@ def pa_to_pd(pa_data): serializer = ExprSerializer() proto_expr = serializer.serialize(self.root) proto_bytes = proto_expr.SerializeToString() - df_pa = pd_to_pa(input_df) + df_pa = pd_to_pa(input_df, schema) proto_schema = {} for key, value in schema.items(): proto_schema[key] = get_datatype(value).SerializeToString() @@ -780,6 +737,8 @@ def __str__(self) -> str: class FillNull(Expr): def __init__(self, expr: Expr, value: Any): self.expr = expr + self.fill = lit(value) + super(FillNull, self).__init__() self.value = make_expr(value) def __str__(self) -> str: diff --git a/fennel/expr/serializer.py b/fennel/expr/serializer.py index 10167fca2..8219e9b43 100644 --- a/fennel/expr/serializer.py +++ b/fennel/expr/serializer.py @@ -116,10 +116,10 @@ def visitIsNull(self, obj): return expr def visitFillNull(self, obj): - return "FILL NULL(%s, %s)" % ( - self.visit(obj.expr), - self.visit(obj.fill), - ) + expr = proto.Expr() + expr.fillnull.fill.CopyFrom(self.visit(obj.fill)) + expr.fillnull.operand.CopyFrom(self.visit(obj.expr)) + return expr def visitWhen(self, obj): expr = proto.Expr() diff --git a/fennel/expr/test_expr.py b/fennel/expr/test_expr.py index d3277659a..27a8191dd 100644 --- a/fennel/expr/test_expr.py +++ b/fennel/expr/test_expr.py @@ -65,8 +65,6 @@ class TestDataset: ret = expr.eval(df, {"a": int, "b": int}) assert ret.tolist() == [9, 11, 13, 15] - ret = expr.eval(df, TestDataset.schema()) - assert ret.tolist() == [9, 11, 13, 15] assert expr.typeof({"a": int, "b": int}) == int ref_extractor = FetchReferences() diff --git a/fennel/gen/connector_pb2.py b/fennel/gen/connector_pb2.py index 9b25323c8..5ae41e6b0 100644 --- a/fennel/gen/connector_pb2.py +++ b/fennel/gen/connector_pb2.py @@ -13,13 +13,14 @@ from google.protobuf import duration_pb2 as google_dot_protobuf_dot_duration__pb2 from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 +import fennel.gen.expr_pb2 as expr__pb2 import fennel.gen.kinesis_pb2 as kinesis__pb2 import fennel.gen.pycode_pb2 as pycode__pb2 import fennel.gen.schema_registry_pb2 as schema__registry__pb2 import fennel.gen.schema_pb2 as schema__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0f\x63onnector.proto\x12\x16\x66\x65nnel.proto.connector\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\rkinesis.proto\x1a\x0cpycode.proto\x1a\x15schema_registry.proto\x1a\x0cschema.proto\"\x8c\x05\n\x0b\x45xtDatabase\x12\x0c\n\x04name\x18\x01 \x01(\t\x12.\n\x05mysql\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.connector.MySQLH\x00\x12\x34\n\x08postgres\x18\x03 \x01(\x0b\x32 .fennel.proto.connector.PostgresH\x00\x12\x36\n\treference\x18\x04 \x01(\x0b\x32!.fennel.proto.connector.ReferenceH\x00\x12(\n\x02s3\x18\x05 \x01(\x0b\x32\x1a.fennel.proto.connector.S3H\x00\x12\x34\n\x08\x62igquery\x18\x06 \x01(\x0b\x32 .fennel.proto.connector.BigqueryH\x00\x12\x36\n\tsnowflake\x18\x07 \x01(\x0b\x32!.fennel.proto.connector.SnowflakeH\x00\x12.\n\x05kafka\x18\x08 \x01(\x0b\x32\x1d.fennel.proto.connector.KafkaH\x00\x12\x32\n\x07webhook\x18\t \x01(\x0b\x32\x1f.fennel.proto.connector.WebhookH\x00\x12\x32\n\x07kinesis\x18\n \x01(\x0b\x32\x1f.fennel.proto.connector.KinesisH\x00\x12\x34\n\x08redshift\x18\x0b \x01(\x0b\x32 .fennel.proto.connector.RedshiftH\x00\x12.\n\x05mongo\x18\x0c \x01(\x0b\x32\x1d.fennel.proto.connector.MongoH\x00\x12\x30\n\x06pubsub\x18\r \x01(\x0b\x32\x1e.fennel.proto.connector.PubSubH\x00\x42\t\n\x07variant\"M\n\x0cPubSubFormat\x12\x32\n\x04json\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.JsonFormatH\x00\x42\t\n\x07variant\"\xbc\x01\n\x0bKafkaFormat\x12\x32\n\x04json\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.JsonFormatH\x00\x12\x32\n\x04\x61vro\x18\x02 \x01(\x0b\x32\".fennel.proto.connector.AvroFormatH\x00\x12:\n\x08protobuf\x18\x03 \x01(\x0b\x32&.fennel.proto.connector.ProtobufFormatH\x00\x42\t\n\x07variant\"\x0c\n\nJsonFormat\"S\n\nAvroFormat\x12\x45\n\x0fschema_registry\x18\x01 \x01(\x0b\x32,.fennel.proto.schema_registry.SchemaRegistry\"W\n\x0eProtobufFormat\x12\x45\n\x0fschema_registry\x18\x01 \x01(\x0b\x32,.fennel.proto.schema_registry.SchemaRegistry\"\xde\x01\n\tReference\x12;\n\x06\x64\x62type\x18\x01 \x01(\x0e\x32+.fennel.proto.connector.Reference.ExtDBType\"\x93\x01\n\tExtDBType\x12\t\n\x05MYSQL\x10\x00\x12\x0c\n\x08POSTGRES\x10\x01\x12\x06\n\x02S3\x10\x02\x12\t\n\x05KAFKA\x10\x03\x12\x0c\n\x08\x42IGQUERY\x10\x04\x12\r\n\tSNOWFLAKE\x10\x05\x12\x0b\n\x07WEBHOOK\x10\x06\x12\x0b\n\x07KINESIS\x10\x07\x12\x0c\n\x08REDSHIFT\x10\x08\x12\t\n\x05MONGO\x10\t\x12\n\n\x06PUBSUB\x10\n\"E\n\x07Webhook\x12\x0c\n\x04name\x18\x01 \x01(\t\x12,\n\tretention\x18\x02 \x01(\x0b\x32\x19.google.protobuf.Duration\"j\n\x05MySQL\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04user\x18\x03 \x01(\t\x12\x10\n\x08password\x18\x04 \x01(\t\x12\x0c\n\x04port\x18\x05 \x01(\r\x12\x13\n\x0bjdbc_params\x18\x06 \x01(\t\"m\n\x08Postgres\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04user\x18\x03 \x01(\t\x12\x10\n\x08password\x18\x04 \x01(\t\x12\x0c\n\x04port\x18\x05 \x01(\r\x12\x13\n\x0bjdbc_params\x18\x06 \x01(\t\"b\n\x02S3\x12\x1d\n\x15\x61ws_secret_access_key\x18\x01 \x01(\t\x12\x19\n\x11\x61ws_access_key_id\x18\x02 \x01(\t\x12\x15\n\x08role_arn\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x0b\n\t_role_arn\"O\n\x08\x42igquery\x12\x12\n\ndataset_id\x18\x01 \x01(\t\x12\x1b\n\x13service_account_key\x18\x02 \x01(\t\x12\x12\n\nproject_id\x18\x03 \x01(\t\"\x7f\n\tSnowflake\x12\x0f\n\x07\x61\x63\x63ount\x18\x01 \x01(\t\x12\x0c\n\x04user\x18\x02 \x01(\t\x12\x10\n\x08password\x18\x03 \x01(\t\x12\x0e\n\x06schema\x18\x04 \x01(\t\x12\x11\n\twarehouse\x18\x05 \x01(\t\x12\x0c\n\x04role\x18\x06 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x07 \x01(\t\"\xfd\x01\n\x05Kafka\x12\x19\n\x11\x62ootstrap_servers\x18\x01 \x01(\t\x12\x19\n\x11security_protocol\x18\x02 \x01(\t\x12\x16\n\x0esasl_mechanism\x18\x03 \x01(\t\x12\x1c\n\x10sasl_jaas_config\x18\x04 \x01(\tB\x02\x18\x01\x12 \n\x13sasl_plain_username\x18\x05 \x01(\tH\x00\x88\x01\x01\x12 \n\x13sasl_plain_password\x18\x06 \x01(\tH\x01\x88\x01\x01\x12\x14\n\x08group_id\x18\x07 \x01(\tB\x02\x18\x01\x42\x16\n\x14_sasl_plain_usernameB\x16\n\x14_sasl_plain_password\"\x1b\n\x07Kinesis\x12\x10\n\x08role_arn\x18\x01 \x01(\t\"1\n\x0b\x43redentials\x12\x10\n\x08username\x18\x01 \x01(\t\x12\x10\n\x08password\x18\x02 \x01(\t\"}\n\x16RedshiftAuthentication\x12\x1c\n\x12s3_access_role_arn\x18\x01 \x01(\tH\x00\x12:\n\x0b\x63redentials\x18\x02 \x01(\x0b\x32#.fennel.proto.connector.CredentialsH\x00\x42\t\n\x07variant\"\x99\x01\n\x08Redshift\x12\x10\n\x08\x64\x61tabase\x18\x01 \x01(\t\x12\x0c\n\x04host\x18\x02 \x01(\t\x12\x0c\n\x04port\x18\x03 \x01(\r\x12\x0e\n\x06schema\x18\x04 \x01(\t\x12O\n\x17redshift_authentication\x18\x05 \x01(\x0b\x32..fennel.proto.connector.RedshiftAuthentication\"G\n\x05Mongo\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04user\x18\x03 \x01(\t\x12\x10\n\x08password\x18\x04 \x01(\t\"9\n\x06PubSub\x12\x12\n\nproject_id\x18\x01 \x01(\t\x12\x1b\n\x13service_account_key\x18\x02 \x01(\t\"\xc0\x05\n\x08\x45xtTable\x12\x39\n\x0bmysql_table\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.MySQLTableH\x00\x12\x39\n\x08pg_table\x18\x02 \x01(\x0b\x32%.fennel.proto.connector.PostgresTableH\x00\x12\x33\n\x08s3_table\x18\x03 \x01(\x0b\x32\x1f.fennel.proto.connector.S3TableH\x00\x12\x39\n\x0bkafka_topic\x18\x04 \x01(\x0b\x32\".fennel.proto.connector.KafkaTopicH\x00\x12\x41\n\x0fsnowflake_table\x18\x05 \x01(\x0b\x32&.fennel.proto.connector.SnowflakeTableH\x00\x12?\n\x0e\x62igquery_table\x18\x06 \x01(\x0b\x32%.fennel.proto.connector.BigqueryTableH\x00\x12;\n\x08\x65ndpoint\x18\x07 \x01(\x0b\x32\'.fennel.proto.connector.WebhookEndpointH\x00\x12?\n\x0ekinesis_stream\x18\x08 \x01(\x0b\x32%.fennel.proto.connector.KinesisStreamH\x00\x12?\n\x0eredshift_table\x18\t \x01(\x0b\x32%.fennel.proto.connector.RedshiftTableH\x00\x12\x43\n\x10mongo_collection\x18\n \x01(\x0b\x32\'.fennel.proto.connector.MongoCollectionH\x00\x12;\n\x0cpubsub_topic\x18\x0b \x01(\x0b\x32#.fennel.proto.connector.PubSubTopicH\x00\x42\t\n\x07variant\"Q\n\nMySQLTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"z\n\rPostgresTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\x12\x16\n\tslot_name\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x0c\n\n_slot_name\"\xf7\x01\n\x07S3Table\x12\x0e\n\x06\x62ucket\x18\x01 \x01(\t\x12\x13\n\x0bpath_prefix\x18\x02 \x01(\t\x12\x11\n\tdelimiter\x18\x04 \x01(\t\x12\x0e\n\x06\x66ormat\x18\x05 \x01(\t\x12/\n\x02\x64\x62\x18\x06 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\npre_sorted\x18\x07 \x01(\x08\x12\x13\n\x0bpath_suffix\x18\x08 \x01(\t\x12.\n\x06spread\x18\x03 \x01(\x0b\x32\x19.google.protobuf.DurationH\x00\x88\x01\x01\x12\x0f\n\x07headers\x18\t \x03(\tB\t\n\x07_spread\"\x81\x01\n\nKafkaTopic\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\r\n\x05topic\x18\x02 \x01(\t\x12\x33\n\x06\x66ormat\x18\x03 \x01(\x0b\x32#.fennel.proto.connector.KafkaFormat\"T\n\rBigqueryTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"U\n\x0eSnowflakeTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"\x81\x01\n\x0fWebhookEndpoint\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08\x65ndpoint\x18\x02 \x01(\t\x12+\n\x08\x64uration\x18\x03 \x01(\x0b\x32\x19.google.protobuf.Duration\"\xd3\x01\n\rKinesisStream\x12\x12\n\nstream_arn\x18\x01 \x01(\t\x12\x39\n\rinit_position\x18\x02 \x01(\x0e\x32\".fennel.proto.kinesis.InitPosition\x12\x32\n\x0einit_timestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0e\n\x06\x66ormat\x18\x04 \x01(\t\x12/\n\x02\x64\x62\x18\x05 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\"T\n\rRedshiftTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"\x86\x01\n\x0bPubSubTopic\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08topic_id\x18\x02 \x01(\t\x12\x34\n\x06\x66ormat\x18\x03 \x01(\x0b\x32$.fennel.proto.connector.PubSubFormat\"U\n\x0cPreProcValue\x12\r\n\x03ref\x18\x01 \x01(\tH\x00\x12+\n\x05value\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.schema.ValueH\x00\x42\t\n\x07variant\"[\n\x0fMongoCollection\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x17\n\x0f\x63ollection_name\x18\x02 \x01(\t\"2\n\x0cSnapshotData\x12\x0e\n\x06marker\x18\x01 \x01(\t\x12\x12\n\nnum_retain\x18\x02 \x01(\r\"\r\n\x0bIncremental\"\n\n\x08Recreate\"\xbc\x01\n\x05Style\x12:\n\x0bincremental\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.IncrementalH\x00\x12\x34\n\x08recreate\x18\x02 \x01(\x0b\x32 .fennel.proto.connector.RecreateH\x00\x12\x38\n\x08snapshot\x18\x03 \x01(\x0b\x32$.fennel.proto.connector.SnapshotDataH\x00\x42\x07\n\x05Style\"\xb1\x05\n\x06Source\x12/\n\x05table\x18\x01 \x01(\x0b\x32 .fennel.proto.connector.ExtTable\x12\x0f\n\x07\x64\x61taset\x18\x02 \x01(\t\x12\x12\n\nds_version\x18\x03 \x01(\r\x12(\n\x05\x65very\x18\x04 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x13\n\x06\x63ursor\x18\x05 \x01(\tH\x00\x88\x01\x01\x12+\n\x08\x64isorder\x18\x06 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x17\n\x0ftimestamp_field\x18\x07 \x01(\t\x12\x30\n\x03\x63\x64\x63\x18\x08 \x01(\x0e\x32#.fennel.proto.connector.CDCStrategy\x12\x31\n\rstarting_from\x18\t \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12=\n\x08pre_proc\x18\n \x03(\x0b\x32+.fennel.proto.connector.Source.PreProcEntry\x12\x0f\n\x07version\x18\x0b \x01(\r\x12\x0f\n\x07\x62ounded\x18\x0c \x01(\x08\x12\x30\n\x08idleness\x18\r \x01(\x0b\x32\x19.google.protobuf.DurationH\x01\x88\x01\x01\x12)\n\x05until\x18\x0e \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x30\n\x06\x66ilter\x18\x0f \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCodeH\x02\x88\x01\x01\x1aT\n\x0cPreProcEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x33\n\x05value\x18\x02 \x01(\x0b\x32$.fennel.proto.connector.PreProcValue:\x02\x38\x01\x42\t\n\x07_cursorB\x0b\n\t_idlenessB\t\n\x07_filter\"\xee\x03\n\x04Sink\x12/\n\x05table\x18\x01 \x01(\x0b\x32 .fennel.proto.connector.ExtTable\x12\x0f\n\x07\x64\x61taset\x18\x02 \x01(\t\x12\x12\n\nds_version\x18\x03 \x01(\r\x12\x35\n\x03\x63\x64\x63\x18\x04 \x01(\x0e\x32#.fennel.proto.connector.CDCStrategyH\x00\x88\x01\x01\x12(\n\x05\x65very\x18\x05 \x01(\x0b\x32\x19.google.protobuf.Duration\x12/\n\x03how\x18\x06 \x01(\x0b\x32\x1d.fennel.proto.connector.StyleH\x01\x88\x01\x01\x12\x0e\n\x06\x63reate\x18\x07 \x01(\x08\x12:\n\x07renames\x18\x08 \x03(\x0b\x32).fennel.proto.connector.Sink.RenamesEntry\x12.\n\x05since\x18\t \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x02\x88\x01\x01\x12.\n\x05until\x18\n \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x03\x88\x01\x01\x1a.\n\x0cRenamesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x06\n\x04_cdcB\x06\n\x04_howB\x08\n\x06_sinceB\x08\n\x06_until*K\n\x0b\x43\x44\x43Strategy\x12\n\n\x06\x41ppend\x10\x00\x12\n\n\x06Upsert\x10\x01\x12\x0c\n\x08\x44\x65\x62\x65zium\x10\x02\x12\n\n\x06Native\x10\x03\x12\n\n\x06\x44\x65lete\x10\x04\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0f\x63onnector.proto\x12\x16\x66\x65nnel.proto.connector\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\nexpr.proto\x1a\rkinesis.proto\x1a\x0cpycode.proto\x1a\x15schema_registry.proto\x1a\x0cschema.proto\"\x8c\x05\n\x0b\x45xtDatabase\x12\x0c\n\x04name\x18\x01 \x01(\t\x12.\n\x05mysql\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.connector.MySQLH\x00\x12\x34\n\x08postgres\x18\x03 \x01(\x0b\x32 .fennel.proto.connector.PostgresH\x00\x12\x36\n\treference\x18\x04 \x01(\x0b\x32!.fennel.proto.connector.ReferenceH\x00\x12(\n\x02s3\x18\x05 \x01(\x0b\x32\x1a.fennel.proto.connector.S3H\x00\x12\x34\n\x08\x62igquery\x18\x06 \x01(\x0b\x32 .fennel.proto.connector.BigqueryH\x00\x12\x36\n\tsnowflake\x18\x07 \x01(\x0b\x32!.fennel.proto.connector.SnowflakeH\x00\x12.\n\x05kafka\x18\x08 \x01(\x0b\x32\x1d.fennel.proto.connector.KafkaH\x00\x12\x32\n\x07webhook\x18\t \x01(\x0b\x32\x1f.fennel.proto.connector.WebhookH\x00\x12\x32\n\x07kinesis\x18\n \x01(\x0b\x32\x1f.fennel.proto.connector.KinesisH\x00\x12\x34\n\x08redshift\x18\x0b \x01(\x0b\x32 .fennel.proto.connector.RedshiftH\x00\x12.\n\x05mongo\x18\x0c \x01(\x0b\x32\x1d.fennel.proto.connector.MongoH\x00\x12\x30\n\x06pubsub\x18\r \x01(\x0b\x32\x1e.fennel.proto.connector.PubSubH\x00\x42\t\n\x07variant\"M\n\x0cPubSubFormat\x12\x32\n\x04json\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.JsonFormatH\x00\x42\t\n\x07variant\"\xbc\x01\n\x0bKafkaFormat\x12\x32\n\x04json\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.JsonFormatH\x00\x12\x32\n\x04\x61vro\x18\x02 \x01(\x0b\x32\".fennel.proto.connector.AvroFormatH\x00\x12:\n\x08protobuf\x18\x03 \x01(\x0b\x32&.fennel.proto.connector.ProtobufFormatH\x00\x42\t\n\x07variant\"\x0c\n\nJsonFormat\"S\n\nAvroFormat\x12\x45\n\x0fschema_registry\x18\x01 \x01(\x0b\x32,.fennel.proto.schema_registry.SchemaRegistry\"W\n\x0eProtobufFormat\x12\x45\n\x0fschema_registry\x18\x01 \x01(\x0b\x32,.fennel.proto.schema_registry.SchemaRegistry\"\xde\x01\n\tReference\x12;\n\x06\x64\x62type\x18\x01 \x01(\x0e\x32+.fennel.proto.connector.Reference.ExtDBType\"\x93\x01\n\tExtDBType\x12\t\n\x05MYSQL\x10\x00\x12\x0c\n\x08POSTGRES\x10\x01\x12\x06\n\x02S3\x10\x02\x12\t\n\x05KAFKA\x10\x03\x12\x0c\n\x08\x42IGQUERY\x10\x04\x12\r\n\tSNOWFLAKE\x10\x05\x12\x0b\n\x07WEBHOOK\x10\x06\x12\x0b\n\x07KINESIS\x10\x07\x12\x0c\n\x08REDSHIFT\x10\x08\x12\t\n\x05MONGO\x10\t\x12\n\n\x06PUBSUB\x10\n\"E\n\x07Webhook\x12\x0c\n\x04name\x18\x01 \x01(\t\x12,\n\tretention\x18\x02 \x01(\x0b\x32\x19.google.protobuf.Duration\"j\n\x05MySQL\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04user\x18\x03 \x01(\t\x12\x10\n\x08password\x18\x04 \x01(\t\x12\x0c\n\x04port\x18\x05 \x01(\r\x12\x13\n\x0bjdbc_params\x18\x06 \x01(\t\"m\n\x08Postgres\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04user\x18\x03 \x01(\t\x12\x10\n\x08password\x18\x04 \x01(\t\x12\x0c\n\x04port\x18\x05 \x01(\r\x12\x13\n\x0bjdbc_params\x18\x06 \x01(\t\"b\n\x02S3\x12\x1d\n\x15\x61ws_secret_access_key\x18\x01 \x01(\t\x12\x19\n\x11\x61ws_access_key_id\x18\x02 \x01(\t\x12\x15\n\x08role_arn\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x0b\n\t_role_arn\"O\n\x08\x42igquery\x12\x12\n\ndataset_id\x18\x01 \x01(\t\x12\x1b\n\x13service_account_key\x18\x02 \x01(\t\x12\x12\n\nproject_id\x18\x03 \x01(\t\"\x7f\n\tSnowflake\x12\x0f\n\x07\x61\x63\x63ount\x18\x01 \x01(\t\x12\x0c\n\x04user\x18\x02 \x01(\t\x12\x10\n\x08password\x18\x03 \x01(\t\x12\x0e\n\x06schema\x18\x04 \x01(\t\x12\x11\n\twarehouse\x18\x05 \x01(\t\x12\x0c\n\x04role\x18\x06 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x07 \x01(\t\"\xfd\x01\n\x05Kafka\x12\x19\n\x11\x62ootstrap_servers\x18\x01 \x01(\t\x12\x19\n\x11security_protocol\x18\x02 \x01(\t\x12\x16\n\x0esasl_mechanism\x18\x03 \x01(\t\x12\x1c\n\x10sasl_jaas_config\x18\x04 \x01(\tB\x02\x18\x01\x12 \n\x13sasl_plain_username\x18\x05 \x01(\tH\x00\x88\x01\x01\x12 \n\x13sasl_plain_password\x18\x06 \x01(\tH\x01\x88\x01\x01\x12\x14\n\x08group_id\x18\x07 \x01(\tB\x02\x18\x01\x42\x16\n\x14_sasl_plain_usernameB\x16\n\x14_sasl_plain_password\"\x1b\n\x07Kinesis\x12\x10\n\x08role_arn\x18\x01 \x01(\t\"1\n\x0b\x43redentials\x12\x10\n\x08username\x18\x01 \x01(\t\x12\x10\n\x08password\x18\x02 \x01(\t\"}\n\x16RedshiftAuthentication\x12\x1c\n\x12s3_access_role_arn\x18\x01 \x01(\tH\x00\x12:\n\x0b\x63redentials\x18\x02 \x01(\x0b\x32#.fennel.proto.connector.CredentialsH\x00\x42\t\n\x07variant\"\x99\x01\n\x08Redshift\x12\x10\n\x08\x64\x61tabase\x18\x01 \x01(\t\x12\x0c\n\x04host\x18\x02 \x01(\t\x12\x0c\n\x04port\x18\x03 \x01(\r\x12\x0e\n\x06schema\x18\x04 \x01(\t\x12O\n\x17redshift_authentication\x18\x05 \x01(\x0b\x32..fennel.proto.connector.RedshiftAuthentication\"G\n\x05Mongo\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04user\x18\x03 \x01(\t\x12\x10\n\x08password\x18\x04 \x01(\t\"9\n\x06PubSub\x12\x12\n\nproject_id\x18\x01 \x01(\t\x12\x1b\n\x13service_account_key\x18\x02 \x01(\t\"\xc0\x05\n\x08\x45xtTable\x12\x39\n\x0bmysql_table\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.MySQLTableH\x00\x12\x39\n\x08pg_table\x18\x02 \x01(\x0b\x32%.fennel.proto.connector.PostgresTableH\x00\x12\x33\n\x08s3_table\x18\x03 \x01(\x0b\x32\x1f.fennel.proto.connector.S3TableH\x00\x12\x39\n\x0bkafka_topic\x18\x04 \x01(\x0b\x32\".fennel.proto.connector.KafkaTopicH\x00\x12\x41\n\x0fsnowflake_table\x18\x05 \x01(\x0b\x32&.fennel.proto.connector.SnowflakeTableH\x00\x12?\n\x0e\x62igquery_table\x18\x06 \x01(\x0b\x32%.fennel.proto.connector.BigqueryTableH\x00\x12;\n\x08\x65ndpoint\x18\x07 \x01(\x0b\x32\'.fennel.proto.connector.WebhookEndpointH\x00\x12?\n\x0ekinesis_stream\x18\x08 \x01(\x0b\x32%.fennel.proto.connector.KinesisStreamH\x00\x12?\n\x0eredshift_table\x18\t \x01(\x0b\x32%.fennel.proto.connector.RedshiftTableH\x00\x12\x43\n\x10mongo_collection\x18\n \x01(\x0b\x32\'.fennel.proto.connector.MongoCollectionH\x00\x12;\n\x0cpubsub_topic\x18\x0b \x01(\x0b\x32#.fennel.proto.connector.PubSubTopicH\x00\x42\t\n\x07variant\"Q\n\nMySQLTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"z\n\rPostgresTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\x12\x16\n\tslot_name\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x0c\n\n_slot_name\"\xf7\x01\n\x07S3Table\x12\x0e\n\x06\x62ucket\x18\x01 \x01(\t\x12\x13\n\x0bpath_prefix\x18\x02 \x01(\t\x12\x11\n\tdelimiter\x18\x04 \x01(\t\x12\x0e\n\x06\x66ormat\x18\x05 \x01(\t\x12/\n\x02\x64\x62\x18\x06 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\npre_sorted\x18\x07 \x01(\x08\x12\x13\n\x0bpath_suffix\x18\x08 \x01(\t\x12.\n\x06spread\x18\x03 \x01(\x0b\x32\x19.google.protobuf.DurationH\x00\x88\x01\x01\x12\x0f\n\x07headers\x18\t \x03(\tB\t\n\x07_spread\"\x81\x01\n\nKafkaTopic\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\r\n\x05topic\x18\x02 \x01(\t\x12\x33\n\x06\x66ormat\x18\x03 \x01(\x0b\x32#.fennel.proto.connector.KafkaFormat\"T\n\rBigqueryTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"U\n\x0eSnowflakeTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"\x81\x01\n\x0fWebhookEndpoint\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08\x65ndpoint\x18\x02 \x01(\t\x12+\n\x08\x64uration\x18\x03 \x01(\x0b\x32\x19.google.protobuf.Duration\"\xd3\x01\n\rKinesisStream\x12\x12\n\nstream_arn\x18\x01 \x01(\t\x12\x39\n\rinit_position\x18\x02 \x01(\x0e\x32\".fennel.proto.kinesis.InitPosition\x12\x32\n\x0einit_timestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0e\n\x06\x66ormat\x18\x04 \x01(\t\x12/\n\x02\x64\x62\x18\x05 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\"T\n\rRedshiftTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"\x86\x01\n\x0bPubSubTopic\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08topic_id\x18\x02 \x01(\t\x12\x34\n\x06\x66ormat\x18\x03 \x01(\x0b\x32$.fennel.proto.connector.PubSubFormat\"\xab\x02\n\x0cPreProcValue\x12\r\n\x03ref\x18\x01 \x01(\tH\x00\x12+\n\x05value\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.schema.ValueH\x00\x12\x39\n\x04\x65val\x18\x03 \x01(\x0b\x32).fennel.proto.connector.PreProcValue.EvalH\x00\x1a\x98\x01\n\x04\x45val\x12+\n\x06schema\x18\x01 \x01(\x0b\x32\x1b.fennel.proto.schema.Schema\x12\'\n\x04\x65xpr\x18\x02 \x01(\x0b\x32\x17.fennel.proto.expr.ExprH\x00\x12-\n\x06pycode\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCodeH\x00\x42\x0b\n\teval_typeB\t\n\x07variant\"[\n\x0fMongoCollection\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x17\n\x0f\x63ollection_name\x18\x02 \x01(\t\"2\n\x0cSnapshotData\x12\x0e\n\x06marker\x18\x01 \x01(\t\x12\x12\n\nnum_retain\x18\x02 \x01(\r\"\r\n\x0bIncremental\"\n\n\x08Recreate\"\xbc\x01\n\x05Style\x12:\n\x0bincremental\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.IncrementalH\x00\x12\x34\n\x08recreate\x18\x02 \x01(\x0b\x32 .fennel.proto.connector.RecreateH\x00\x12\x38\n\x08snapshot\x18\x03 \x01(\x0b\x32$.fennel.proto.connector.SnapshotDataH\x00\x42\x07\n\x05Style\"\xb1\x05\n\x06Source\x12/\n\x05table\x18\x01 \x01(\x0b\x32 .fennel.proto.connector.ExtTable\x12\x0f\n\x07\x64\x61taset\x18\x02 \x01(\t\x12\x12\n\nds_version\x18\x03 \x01(\r\x12(\n\x05\x65very\x18\x04 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x13\n\x06\x63ursor\x18\x05 \x01(\tH\x00\x88\x01\x01\x12+\n\x08\x64isorder\x18\x06 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x17\n\x0ftimestamp_field\x18\x07 \x01(\t\x12\x30\n\x03\x63\x64\x63\x18\x08 \x01(\x0e\x32#.fennel.proto.connector.CDCStrategy\x12\x31\n\rstarting_from\x18\t \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12=\n\x08pre_proc\x18\n \x03(\x0b\x32+.fennel.proto.connector.Source.PreProcEntry\x12\x0f\n\x07version\x18\x0b \x01(\r\x12\x0f\n\x07\x62ounded\x18\x0c \x01(\x08\x12\x30\n\x08idleness\x18\r \x01(\x0b\x32\x19.google.protobuf.DurationH\x01\x88\x01\x01\x12)\n\x05until\x18\x0e \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x30\n\x06\x66ilter\x18\x0f \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCodeH\x02\x88\x01\x01\x1aT\n\x0cPreProcEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x33\n\x05value\x18\x02 \x01(\x0b\x32$.fennel.proto.connector.PreProcValue:\x02\x38\x01\x42\t\n\x07_cursorB\x0b\n\t_idlenessB\t\n\x07_filter\"\xee\x03\n\x04Sink\x12/\n\x05table\x18\x01 \x01(\x0b\x32 .fennel.proto.connector.ExtTable\x12\x0f\n\x07\x64\x61taset\x18\x02 \x01(\t\x12\x12\n\nds_version\x18\x03 \x01(\r\x12\x35\n\x03\x63\x64\x63\x18\x04 \x01(\x0e\x32#.fennel.proto.connector.CDCStrategyH\x00\x88\x01\x01\x12(\n\x05\x65very\x18\x05 \x01(\x0b\x32\x19.google.protobuf.Duration\x12/\n\x03how\x18\x06 \x01(\x0b\x32\x1d.fennel.proto.connector.StyleH\x01\x88\x01\x01\x12\x0e\n\x06\x63reate\x18\x07 \x01(\x08\x12:\n\x07renames\x18\x08 \x03(\x0b\x32).fennel.proto.connector.Sink.RenamesEntry\x12.\n\x05since\x18\t \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x02\x88\x01\x01\x12.\n\x05until\x18\n \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x03\x88\x01\x01\x1a.\n\x0cRenamesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x06\n\x04_cdcB\x06\n\x04_howB\x08\n\x06_sinceB\x08\n\x06_until*K\n\x0b\x43\x44\x43Strategy\x12\n\n\x06\x41ppend\x10\x00\x12\n\n\x06Upsert\x10\x01\x12\x0c\n\x08\x44\x65\x62\x65zium\x10\x02\x12\n\n\x06Native\x10\x03\x12\n\n\x06\x44\x65lete\x10\x04\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -34,90 +35,92 @@ _globals['_SOURCE_PREPROCENTRY']._serialized_options = b'8\001' _globals['_SINK_RENAMESENTRY']._options = None _globals['_SINK_RENAMESENTRY']._serialized_options = b'8\001' - _globals['_CDCSTRATEGY']._serialized_start=6540 - _globals['_CDCSTRATEGY']._serialized_end=6615 - _globals['_EXTDATABASE']._serialized_start=175 - _globals['_EXTDATABASE']._serialized_end=827 - _globals['_PUBSUBFORMAT']._serialized_start=829 - _globals['_PUBSUBFORMAT']._serialized_end=906 - _globals['_KAFKAFORMAT']._serialized_start=909 - _globals['_KAFKAFORMAT']._serialized_end=1097 - _globals['_JSONFORMAT']._serialized_start=1099 - _globals['_JSONFORMAT']._serialized_end=1111 - _globals['_AVROFORMAT']._serialized_start=1113 - _globals['_AVROFORMAT']._serialized_end=1196 - _globals['_PROTOBUFFORMAT']._serialized_start=1198 - _globals['_PROTOBUFFORMAT']._serialized_end=1285 - _globals['_REFERENCE']._serialized_start=1288 - _globals['_REFERENCE']._serialized_end=1510 - _globals['_REFERENCE_EXTDBTYPE']._serialized_start=1363 - _globals['_REFERENCE_EXTDBTYPE']._serialized_end=1510 - _globals['_WEBHOOK']._serialized_start=1512 - _globals['_WEBHOOK']._serialized_end=1581 - _globals['_MYSQL']._serialized_start=1583 - _globals['_MYSQL']._serialized_end=1689 - _globals['_POSTGRES']._serialized_start=1691 - _globals['_POSTGRES']._serialized_end=1800 - _globals['_S3']._serialized_start=1802 - _globals['_S3']._serialized_end=1900 - _globals['_BIGQUERY']._serialized_start=1902 - _globals['_BIGQUERY']._serialized_end=1981 - _globals['_SNOWFLAKE']._serialized_start=1983 - _globals['_SNOWFLAKE']._serialized_end=2110 - _globals['_KAFKA']._serialized_start=2113 - _globals['_KAFKA']._serialized_end=2366 - _globals['_KINESIS']._serialized_start=2368 - _globals['_KINESIS']._serialized_end=2395 - _globals['_CREDENTIALS']._serialized_start=2397 - _globals['_CREDENTIALS']._serialized_end=2446 - _globals['_REDSHIFTAUTHENTICATION']._serialized_start=2448 - _globals['_REDSHIFTAUTHENTICATION']._serialized_end=2573 - _globals['_REDSHIFT']._serialized_start=2576 - _globals['_REDSHIFT']._serialized_end=2729 - _globals['_MONGO']._serialized_start=2731 - _globals['_MONGO']._serialized_end=2802 - _globals['_PUBSUB']._serialized_start=2804 - _globals['_PUBSUB']._serialized_end=2861 - _globals['_EXTTABLE']._serialized_start=2864 - _globals['_EXTTABLE']._serialized_end=3568 - _globals['_MYSQLTABLE']._serialized_start=3570 - _globals['_MYSQLTABLE']._serialized_end=3651 - _globals['_POSTGRESTABLE']._serialized_start=3653 - _globals['_POSTGRESTABLE']._serialized_end=3775 - _globals['_S3TABLE']._serialized_start=3778 - _globals['_S3TABLE']._serialized_end=4025 - _globals['_KAFKATOPIC']._serialized_start=4028 - _globals['_KAFKATOPIC']._serialized_end=4157 - _globals['_BIGQUERYTABLE']._serialized_start=4159 - _globals['_BIGQUERYTABLE']._serialized_end=4243 - _globals['_SNOWFLAKETABLE']._serialized_start=4245 - _globals['_SNOWFLAKETABLE']._serialized_end=4330 - _globals['_WEBHOOKENDPOINT']._serialized_start=4333 - _globals['_WEBHOOKENDPOINT']._serialized_end=4462 - _globals['_KINESISSTREAM']._serialized_start=4465 - _globals['_KINESISSTREAM']._serialized_end=4676 - _globals['_REDSHIFTTABLE']._serialized_start=4678 - _globals['_REDSHIFTTABLE']._serialized_end=4762 - _globals['_PUBSUBTOPIC']._serialized_start=4765 - _globals['_PUBSUBTOPIC']._serialized_end=4899 - _globals['_PREPROCVALUE']._serialized_start=4901 - _globals['_PREPROCVALUE']._serialized_end=4986 - _globals['_MONGOCOLLECTION']._serialized_start=4988 - _globals['_MONGOCOLLECTION']._serialized_end=5079 - _globals['_SNAPSHOTDATA']._serialized_start=5081 - _globals['_SNAPSHOTDATA']._serialized_end=5131 - _globals['_INCREMENTAL']._serialized_start=5133 - _globals['_INCREMENTAL']._serialized_end=5146 - _globals['_RECREATE']._serialized_start=5148 - _globals['_RECREATE']._serialized_end=5158 - _globals['_STYLE']._serialized_start=5161 - _globals['_STYLE']._serialized_end=5349 - _globals['_SOURCE']._serialized_start=5352 - _globals['_SOURCE']._serialized_end=6041 - _globals['_SOURCE_PREPROCENTRY']._serialized_start=5922 - _globals['_SOURCE_PREPROCENTRY']._serialized_end=6006 - _globals['_SINK']._serialized_start=6044 - _globals['_SINK']._serialized_end=6538 - _globals['_SINK_RENAMESENTRY']._serialized_start=6456 - _globals['_SINK_RENAMESENTRY']._serialized_end=6502 + _globals['_CDCSTRATEGY']._serialized_start=6767 + _globals['_CDCSTRATEGY']._serialized_end=6842 + _globals['_EXTDATABASE']._serialized_start=187 + _globals['_EXTDATABASE']._serialized_end=839 + _globals['_PUBSUBFORMAT']._serialized_start=841 + _globals['_PUBSUBFORMAT']._serialized_end=918 + _globals['_KAFKAFORMAT']._serialized_start=921 + _globals['_KAFKAFORMAT']._serialized_end=1109 + _globals['_JSONFORMAT']._serialized_start=1111 + _globals['_JSONFORMAT']._serialized_end=1123 + _globals['_AVROFORMAT']._serialized_start=1125 + _globals['_AVROFORMAT']._serialized_end=1208 + _globals['_PROTOBUFFORMAT']._serialized_start=1210 + _globals['_PROTOBUFFORMAT']._serialized_end=1297 + _globals['_REFERENCE']._serialized_start=1300 + _globals['_REFERENCE']._serialized_end=1522 + _globals['_REFERENCE_EXTDBTYPE']._serialized_start=1375 + _globals['_REFERENCE_EXTDBTYPE']._serialized_end=1522 + _globals['_WEBHOOK']._serialized_start=1524 + _globals['_WEBHOOK']._serialized_end=1593 + _globals['_MYSQL']._serialized_start=1595 + _globals['_MYSQL']._serialized_end=1701 + _globals['_POSTGRES']._serialized_start=1703 + _globals['_POSTGRES']._serialized_end=1812 + _globals['_S3']._serialized_start=1814 + _globals['_S3']._serialized_end=1912 + _globals['_BIGQUERY']._serialized_start=1914 + _globals['_BIGQUERY']._serialized_end=1993 + _globals['_SNOWFLAKE']._serialized_start=1995 + _globals['_SNOWFLAKE']._serialized_end=2122 + _globals['_KAFKA']._serialized_start=2125 + _globals['_KAFKA']._serialized_end=2378 + _globals['_KINESIS']._serialized_start=2380 + _globals['_KINESIS']._serialized_end=2407 + _globals['_CREDENTIALS']._serialized_start=2409 + _globals['_CREDENTIALS']._serialized_end=2458 + _globals['_REDSHIFTAUTHENTICATION']._serialized_start=2460 + _globals['_REDSHIFTAUTHENTICATION']._serialized_end=2585 + _globals['_REDSHIFT']._serialized_start=2588 + _globals['_REDSHIFT']._serialized_end=2741 + _globals['_MONGO']._serialized_start=2743 + _globals['_MONGO']._serialized_end=2814 + _globals['_PUBSUB']._serialized_start=2816 + _globals['_PUBSUB']._serialized_end=2873 + _globals['_EXTTABLE']._serialized_start=2876 + _globals['_EXTTABLE']._serialized_end=3580 + _globals['_MYSQLTABLE']._serialized_start=3582 + _globals['_MYSQLTABLE']._serialized_end=3663 + _globals['_POSTGRESTABLE']._serialized_start=3665 + _globals['_POSTGRESTABLE']._serialized_end=3787 + _globals['_S3TABLE']._serialized_start=3790 + _globals['_S3TABLE']._serialized_end=4037 + _globals['_KAFKATOPIC']._serialized_start=4040 + _globals['_KAFKATOPIC']._serialized_end=4169 + _globals['_BIGQUERYTABLE']._serialized_start=4171 + _globals['_BIGQUERYTABLE']._serialized_end=4255 + _globals['_SNOWFLAKETABLE']._serialized_start=4257 + _globals['_SNOWFLAKETABLE']._serialized_end=4342 + _globals['_WEBHOOKENDPOINT']._serialized_start=4345 + _globals['_WEBHOOKENDPOINT']._serialized_end=4474 + _globals['_KINESISSTREAM']._serialized_start=4477 + _globals['_KINESISSTREAM']._serialized_end=4688 + _globals['_REDSHIFTTABLE']._serialized_start=4690 + _globals['_REDSHIFTTABLE']._serialized_end=4774 + _globals['_PUBSUBTOPIC']._serialized_start=4777 + _globals['_PUBSUBTOPIC']._serialized_end=4911 + _globals['_PREPROCVALUE']._serialized_start=4914 + _globals['_PREPROCVALUE']._serialized_end=5213 + _globals['_PREPROCVALUE_EVAL']._serialized_start=5050 + _globals['_PREPROCVALUE_EVAL']._serialized_end=5202 + _globals['_MONGOCOLLECTION']._serialized_start=5215 + _globals['_MONGOCOLLECTION']._serialized_end=5306 + _globals['_SNAPSHOTDATA']._serialized_start=5308 + _globals['_SNAPSHOTDATA']._serialized_end=5358 + _globals['_INCREMENTAL']._serialized_start=5360 + _globals['_INCREMENTAL']._serialized_end=5373 + _globals['_RECREATE']._serialized_start=5375 + _globals['_RECREATE']._serialized_end=5385 + _globals['_STYLE']._serialized_start=5388 + _globals['_STYLE']._serialized_end=5576 + _globals['_SOURCE']._serialized_start=5579 + _globals['_SOURCE']._serialized_end=6268 + _globals['_SOURCE_PREPROCENTRY']._serialized_start=6149 + _globals['_SOURCE_PREPROCENTRY']._serialized_end=6233 + _globals['_SINK']._serialized_start=6271 + _globals['_SINK']._serialized_end=6765 + _globals['_SINK_RENAMESENTRY']._serialized_start=6683 + _globals['_SINK_RENAMESENTRY']._serialized_end=6729 # @@protoc_insertion_point(module_scope) diff --git a/fennel/gen/connector_pb2.pyi b/fennel/gen/connector_pb2.pyi index 1582eb975..c9550885a 100644 --- a/fennel/gen/connector_pb2.pyi +++ b/fennel/gen/connector_pb2.pyi @@ -13,6 +13,7 @@ database """ import builtins import collections.abc +import expr_pb2 import google.protobuf.descriptor import google.protobuf.duration_pb2 import google.protobuf.internal.containers @@ -900,20 +901,48 @@ global___PubSubTopic = PubSubTopic class PreProcValue(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor + @typing_extensions.final + class Eval(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + SCHEMA_FIELD_NUMBER: builtins.int + EXPR_FIELD_NUMBER: builtins.int + PYCODE_FIELD_NUMBER: builtins.int + @property + def schema(self) -> schema_pb2.Schema: ... + @property + def expr(self) -> expr_pb2.Expr: ... + @property + def pycode(self) -> pycode_pb2.PyCode: ... + def __init__( + self, + *, + schema: schema_pb2.Schema | None = ..., + expr: expr_pb2.Expr | None = ..., + pycode: pycode_pb2.PyCode | None = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["eval_type", b"eval_type", "expr", b"expr", "pycode", b"pycode", "schema", b"schema"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["eval_type", b"eval_type", "expr", b"expr", "pycode", b"pycode", "schema", b"schema"]) -> None: ... + def WhichOneof(self, oneof_group: typing_extensions.Literal["eval_type", b"eval_type"]) -> typing_extensions.Literal["expr", "pycode"] | None: ... + REF_FIELD_NUMBER: builtins.int VALUE_FIELD_NUMBER: builtins.int + EVAL_FIELD_NUMBER: builtins.int ref: builtins.str @property def value(self) -> schema_pb2.Value: ... + @property + def eval(self) -> global___PreProcValue.Eval: ... def __init__( self, *, ref: builtins.str = ..., value: schema_pb2.Value | None = ..., + eval: global___PreProcValue.Eval | None = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["ref", b"ref", "value", b"value", "variant", b"variant"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["ref", b"ref", "value", b"value", "variant", b"variant"]) -> None: ... - def WhichOneof(self, oneof_group: typing_extensions.Literal["variant", b"variant"]) -> typing_extensions.Literal["ref", "value"] | None: ... + def HasField(self, field_name: typing_extensions.Literal["eval", b"eval", "ref", b"ref", "value", b"value", "variant", b"variant"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["eval", b"eval", "ref", b"ref", "value", b"value", "variant", b"variant"]) -> None: ... + def WhichOneof(self, oneof_group: typing_extensions.Literal["variant", b"variant"]) -> typing_extensions.Literal["ref", "value", "eval"] | None: ... global___PreProcValue = PreProcValue diff --git a/fennel/internal_lib/__init__.py b/fennel/internal_lib/__init__.py index e69de29bb..fa0bc1e5f 100644 --- a/fennel/internal_lib/__init__.py +++ b/fennel/internal_lib/__init__.py @@ -0,0 +1,7 @@ +# Defining global fields here +FENNEL_STRUCT = "__fennel_struct__" +FENNEL_STRUCT_SRC_CODE = "__fennel_struct_src_code__" +FENNEL_STRUCT_DEPENDENCIES_SRC_CODE = "__fennel_struct_dependencies_src_code__" +EMAIL_REGEX = r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$" +META_FIELD = "__fennel_metadata__" +OWNER = "__owner__" diff --git a/fennel/internal_lib/schema/__init__.py b/fennel/internal_lib/schema/__init__.py index 2d833e846..fb8ec0a78 100644 --- a/fennel/internal_lib/schema/__init__.py +++ b/fennel/internal_lib/schema/__init__.py @@ -13,4 +13,6 @@ data_schema_check, validate_field_in_df, convert_dtype_to_arrow_type, + from_proto, + cast_col_to_arrow_dtype, ) diff --git a/fennel/internal_lib/schema/schema.py b/fennel/internal_lib/schema/schema.py index 47e5d7ef2..79f5b1085 100644 --- a/fennel/internal_lib/schema/schema.py +++ b/fennel/internal_lib/schema/schema.py @@ -1,10 +1,11 @@ from __future__ import annotations -from typing import Dict + import dataclasses import re import typing from datetime import datetime, date from decimal import Decimal as PythonDecimal +from typing import Dict from typing import ( Union, Any, @@ -30,7 +31,12 @@ _Embedding, _Decimal, ) -from fennel.internal_lib.utils.utils import get_origin, is_user_defined_class +from fennel.internal_lib.utils.utils import ( + get_origin, + is_user_defined_class, + parse_struct_into_dict, + parse_datetime_in_value, +) FENNEL_STRUCT = "__fennel_struct__" FENNEL_STRUCT_SRC_CODE = "__fennel_struct_src_code__" @@ -914,4 +920,34 @@ def from_proto(data_type: schema_proto.DataType) -> Any: else: raise ValueError(f"Unsupported data type field: {field}") - return None + +def cast_col_to_arrow_dtype( + series: pd.Series, dtype: schema_proto.DataType +) -> pd.Series: + """ + This function casts dtype of pd.Series object into pd.ArrowDtype depending on the DataType proto. + """ + if not dtype.HasField("optional_type"): + if series.isnull().any(): + raise ValueError("Null values found in non-optional field.") + + # Let's convert structs into json, this is done because arrow + # dtype conversion fails with fennel struct + if check_dtype_has_struct_type(dtype): + series = series.apply(lambda x: parse_struct_into_dict(x)) + # Parse datetime values + series = series.apply(lambda x: parse_datetime_in_value(x, dtype)) + arrow_type = convert_dtype_to_arrow_type(dtype) + return series.astype(pd.ArrowDtype(arrow_type)) + + +def check_dtype_has_struct_type(dtype: schema_proto.DataType) -> bool: + if dtype.HasField("struct_type"): + return True + elif dtype.HasField("optional_type"): + return check_dtype_has_struct_type(dtype.optional_type.of) + elif dtype.HasField("array_type"): + return check_dtype_has_struct_type(dtype.array_type.of) + elif dtype.HasField("map_type"): + return check_dtype_has_struct_type(dtype.map_type.value) + return False diff --git a/fennel/internal_lib/to_proto/to_proto.py b/fennel/internal_lib/to_proto/to_proto.py index 1c566a0d2..070280153 100644 --- a/fennel/internal_lib/to_proto/to_proto.py +++ b/fennel/internal_lib/to_proto/to_proto.py @@ -21,7 +21,6 @@ from google.protobuf.wrappers_pb2 import StringValue import fennel.connectors as connectors -from fennel.expr.serializer import ExprSerializer import fennel.gen.connector_pb2 as connector_proto import fennel.gen.dataset_pb2 as ds_proto import fennel.gen.expectations_pb2 as exp_proto @@ -40,12 +39,19 @@ indices_from_ds, ) from fennel.dtypes.dtypes import FENNEL_STRUCT +from fennel.expr.expr import Expr, TypedExpr +from fennel.expr.serializer import ExprSerializer +from fennel.expr.visitor import ExprPrinter from fennel.featuresets import Featureset, Feature, Extractor, ExtractorType from fennel.internal_lib.duration import ( Duration, duration_to_timedelta, ) -from fennel.internal_lib.schema import get_datatype, validate_val_with_dtype +from fennel.internal_lib.schema import ( + get_datatype, + validate_val_with_dtype, + get_python_type_from_pd, +) from fennel.internal_lib.to_proto import Serializer from fennel.internal_lib.to_proto.source_code import ( get_featureset_core_code, @@ -55,6 +61,7 @@ wrap_function, ) from fennel.internal_lib.to_proto.source_code import to_includes_proto +from fennel.internal_lib.utils import dtype_to_string from fennel.lib.includes import FENNEL_INCLUDED_MOD from fennel.lib.metadata import get_metadata_proto, get_meta_attr, OWNER from fennel.utils import fennel_get_source @@ -298,7 +305,6 @@ def sync_validation_for_datasets(ds: Dataset, env: Optional[str] = None): for pipeline in ds._pipelines: if pipeline.env.is_entity_selected(env): pipelines.append(pipeline) - sources = source_from_ds(ds, env) if len(sources) > 0 and len(pipelines) > 0: raise ValueError( @@ -419,6 +425,8 @@ def expectations_from_ds(ds: Dataset) -> List[exp_proto.Expectations]: def _validate_source_pre_proc( pre_proc: Dict[str, connectors.PreProcValue], ds: Dataset ): + timestamp_field = ds.timestamp_field + dataset_schema = ds.schema() for field_name, pre_proc_val in pre_proc.items(): ds_field = None for field in ds._fields: @@ -427,18 +435,58 @@ def _validate_source_pre_proc( break if not ds_field: raise ValueError( - f"Dataset {ds._name} has a source with a pre_proc value set for field {field_name}, " + f"Dataset `{ds._name}` has a source with a pre_proc value field `{field_name}`, " "but the field is not defined in the dataset." ) # If the pre_proc is a `ref`, then skip - we can't check the value type or if the exists in the dataset if isinstance(pre_proc_val, connectors.Ref): continue - # Else check that the data type matches the field type - if validate_val_with_dtype(ds_field.dtype, pre_proc_val): # type: ignore - raise ValueError( - f"Dataset {ds._name} has a source with a pre_proc value set for field {field_name}, " - f"but the field type does not match the pre_proc value {pre_proc_val} type." - ) + elif isinstance(pre_proc_val, connectors.Eval): + # Cannot set timestamp field through preproc eval + if field_name == timestamp_field: + raise ValueError( + f"Dataset `{ds._name}` has timestamp field set from assign preproc. Please either ref " + f"preproc or set a constant value." + ) + + # Run output col dtype validation for Expr. + if isinstance(pre_proc_val.eval_type, (Expr, TypedExpr)): + expr = ( + pre_proc_val.eval_type + if isinstance(pre_proc_val.eval_type, Expr) + else pre_proc_val.eval_type.expr + ) + typed_dtype = ( + None + if isinstance(pre_proc_val.eval_type, Expr) + else pre_proc_val.eval_type.dtype + ) + input_schema = ds.schema() + if pre_proc_val.additional_schema: + for name, dtype in pre_proc_val.additional_schema.items(): + input_schema[name] = dtype + expr_type = expr.typeof(input_schema) + if expr_type != get_python_type_from_pd( + dataset_schema[field_name] + ): + printer = ExprPrinter() + raise TypeError( + f"`{field_name}` is of type `{dtype_to_string(dataset_schema[field_name])}` in Dataset `{ds._name}`, " + f"can not be cast to `{dtype_to_string(expr_type)}`. Full expression: `{printer.print(expr.root)}`" + ) + if typed_dtype and expr_type != typed_dtype: + printer = ExprPrinter() + raise TypeError( + f"`{field_name}` in Dataset `{ds._name}` can not be cast to `{dtype_to_string(typed_dtype)}`, " + f"evaluated dtype is `{dtype_to_string(expr_type)}`. Full expression: `{printer.print(expr.root)}`" + ) + else: + # Else check that the data type matches the field type + if validate_val_with_dtype(ds_field.dtype, pre_proc_val): # type: ignore + raise ValueError( + f"Dataset `{ds._name}` has a source with a pre_proc value set for field `{field_name}`, " + f"but the field type does not match the pre_proc value `{pre_proc_val}` type." + ) def source_from_ds( @@ -701,57 +749,95 @@ def _to_field_lookup_proto( def _pre_proc_value_to_proto( + dataset: Dataset, + column_name: str, pre_proc_value: connectors.PreProcValue, ) -> connector_proto.PreProcValue: if isinstance(pre_proc_value, connectors.Ref): return connector_proto.PreProcValue( ref=pre_proc_value.name, ) - # Get the dtype of the pre_proc value. - # - # NOTE: We use the same the utility used to serialize the field types so that the types and their - # serialization are consistent. - dtype = get_datatype(type(pre_proc_value)) - if dtype == schema_proto.DataType(string_type=schema_proto.StringType()): - return connector_proto.PreProcValue( - value=schema_proto.Value(string=pre_proc_value) - ) - elif dtype == schema_proto.DataType(bool_type=schema_proto.BoolType()): - return connector_proto.PreProcValue( - value=schema_proto.Value(bool=pre_proc_value) - ) - elif dtype == schema_proto.DataType(int_type=schema_proto.IntType()): - return connector_proto.PreProcValue( - value=schema_proto.Value(int=pre_proc_value) - ) - elif dtype == schema_proto.DataType(double_type=schema_proto.DoubleType()): - return connector_proto.PreProcValue( - value=schema_proto.Value(float=pre_proc_value) - ) - elif dtype == schema_proto.DataType( - timestamp_type=schema_proto.TimestampType() - ): - ts = Timestamp() - ts.FromDatetime(pre_proc_value) - return connector_proto.PreProcValue( - value=schema_proto.Value(timestamp=ts) - ) - # TODO(mohit): Extend the pre_proc value proto to support other types + elif isinstance(pre_proc_value, connectors.Eval): + if pre_proc_value.additional_schema: + fields = [ + schema_proto.Field(name=name, dtype=get_datatype(dtype)) + for (name, dtype) in pre_proc_value.additional_schema.items() + ] + schema = schema_proto.Schema(fields=fields) + else: + schema = None + + if isinstance(pre_proc_value.eval_type, (Expr, TypedExpr)): + serializer = ExprSerializer() + if isinstance(pre_proc_value.eval_type, Expr): + root = pre_proc_value.eval_type.root + else: + root = pre_proc_value.eval_type.expr.root + return connector_proto.PreProcValue( + eval=connector_proto.PreProcValue.Eval( + schema=schema, + expr=serializer.serialize(root), + ) + ) + else: + return connector_proto.PreProcValue( + eval=connector_proto.PreProcValue.Eval( + schema=schema, + pycode=_preproc_assign_to_pycode( + dataset, column_name, pre_proc_value.eval_type + ), + ) + ) else: - raise ValueError( - f"PreProc value {pre_proc_value} is of type {dtype}, " - f"which is not currently not supported." - ) + # Get the dtype of the pre_proc value. + # + # NOTE: We use the same the utility used to serialize the field types so that the types and their + # serialization are consistent. + dtype = get_datatype(type(pre_proc_value)) + if dtype == schema_proto.DataType( + string_type=schema_proto.StringType() + ): + return connector_proto.PreProcValue( + value=schema_proto.Value(string=pre_proc_value) + ) + elif dtype == schema_proto.DataType(bool_type=schema_proto.BoolType()): + return connector_proto.PreProcValue( + value=schema_proto.Value(bool=pre_proc_value) + ) + elif dtype == schema_proto.DataType(int_type=schema_proto.IntType()): + return connector_proto.PreProcValue( + value=schema_proto.Value(int=pre_proc_value) + ) + elif dtype == schema_proto.DataType( + double_type=schema_proto.DoubleType() + ): + return connector_proto.PreProcValue( + value=schema_proto.Value(float=pre_proc_value) + ) + elif dtype == schema_proto.DataType( + timestamp_type=schema_proto.TimestampType() + ): + ts = Timestamp() + ts.FromDatetime(pre_proc_value) + return connector_proto.PreProcValue( + value=schema_proto.Value(timestamp=ts) + ) + # TODO(mohit): Extend the pre_proc value proto to support other types + else: + raise ValueError( + f"PreProc value {pre_proc_value} is of type {dtype}, " + f"which is not currently not supported." + ) def _pre_proc_to_proto( - pre_proc: Optional[Dict[str, connectors.PreProcValue]] + dataset: Dataset, pre_proc: Optional[Dict[str, connectors.PreProcValue]] ) -> Mapping[str, connector_proto.PreProcValue]: if pre_proc is None: return {} proto_pre_proc = {} for k, v in pre_proc.items(): - proto_pre_proc[k] = _pre_proc_value_to_proto(v) + proto_pre_proc[k] = _pre_proc_value_to_proto(dataset, k, v) return proto_pre_proc @@ -769,6 +855,19 @@ def _preproc_filter_to_pycode( ) +def _preproc_assign_to_pycode( + dataset: Dataset, column_name: str, func: Callable +) -> pycode_proto.PyCode: + return wrap_function( + dataset._name, + get_dataset_core_code(dataset), + "", + to_includes_proto(func), + is_assign=True, + column_name=column_name, + ) + + def _conn_to_source_proto( connector: connectors.DataConnector, dataset: Dataset, @@ -832,7 +931,7 @@ def _webhook_to_source_proto( dataset=dataset._name, ds_version=dataset._version, cdc=to_cdc_proto(connector.cdc), - pre_proc=_pre_proc_to_proto(connector.pre_proc), + pre_proc=_pre_proc_to_proto(dataset, connector.pre_proc), bounded=connector.bounded, idleness=( to_duration_proto(connector.idleness) @@ -873,7 +972,7 @@ def _kafka_conn_to_source_proto( dataset=dataset._name, ds_version=dataset._version, cdc=to_cdc_proto(connector.cdc), - pre_proc=_pre_proc_to_proto(connector.pre_proc), + pre_proc=_pre_proc_to_proto(dataset, connector.pre_proc), starting_from=_to_timestamp_proto(connector.since), until=_to_timestamp_proto(connector.until), bounded=connector.bounded, @@ -976,7 +1075,7 @@ def _s3_conn_to_source_proto( cursor=None, timestamp_field=dataset.timestamp_field, cdc=to_cdc_proto(connector.cdc), - pre_proc=_pre_proc_to_proto(connector.pre_proc), + pre_proc=_pre_proc_to_proto(dataset, connector.pre_proc), bounded=connector.bounded, idleness=( to_duration_proto(connector.idleness) @@ -1171,7 +1270,7 @@ def _bigquery_conn_to_source_proto( until=_to_timestamp_proto(connector.until), timestamp_field=dataset.timestamp_field, cdc=to_cdc_proto(connector.cdc), - pre_proc=_pre_proc_to_proto(connector.pre_proc), + pre_proc=_pre_proc_to_proto(dataset, connector.pre_proc), bounded=connector.bounded, idleness=( to_duration_proto(connector.idleness) @@ -1243,7 +1342,7 @@ def _redshift_conn_to_source_proto( disorder=to_duration_proto(connector.disorder), timestamp_field=dataset.timestamp_field, cdc=to_cdc_proto(connector.cdc), - pre_proc=_pre_proc_to_proto(connector.pre_proc), + pre_proc=_pre_proc_to_proto(dataset, connector.pre_proc), starting_from=_to_timestamp_proto(connector.since), until=_to_timestamp_proto(connector.until), bounded=connector.bounded, @@ -1339,7 +1438,7 @@ def _mongo_conn_to_source_proto( disorder=to_duration_proto(connector.disorder), timestamp_field=dataset.timestamp_field, cdc=to_cdc_proto(connector.cdc), - pre_proc=_pre_proc_to_proto(connector.pre_proc), + pre_proc=_pre_proc_to_proto(dataset, connector.pre_proc), starting_from=_to_timestamp_proto(connector.since), until=_to_timestamp_proto(connector.until), bounded=connector.bounded, @@ -1411,7 +1510,7 @@ def _pubsub_conn_to_source_proto( dataset=dataset._name, ds_version=dataset._version, cdc=to_cdc_proto(connector.cdc), - pre_proc=_pre_proc_to_proto(connector.pre_proc), + pre_proc=_pre_proc_to_proto(dataset, connector.pre_proc), bounded=connector.bounded, idleness=( to_duration_proto(connector.idleness) @@ -1454,7 +1553,7 @@ def _snowflake_conn_to_source_proto( disorder=to_duration_proto(connector.disorder), timestamp_field=dataset.timestamp_field, cdc=to_cdc_proto(connector.cdc), - pre_proc=_pre_proc_to_proto(connector.pre_proc), + pre_proc=_pre_proc_to_proto(dataset, connector.pre_proc), starting_from=_to_timestamp_proto(connector.since), until=_to_timestamp_proto(connector.until), bounded=connector.bounded, @@ -1538,7 +1637,7 @@ def _mysql_conn_to_source_proto( cdc=to_cdc_proto(connector.cdc), starting_from=_to_timestamp_proto(connector.since), until=_to_timestamp_proto(connector.until), - pre_proc=_pre_proc_to_proto(connector.pre_proc), + pre_proc=_pre_proc_to_proto(dataset, connector.pre_proc), bounded=connector.bounded, idleness=( to_duration_proto(connector.idleness) @@ -1629,7 +1728,7 @@ def _pg_conn_to_source_proto( cdc=to_cdc_proto(connector.cdc), starting_from=_to_timestamp_proto(connector.since), until=_to_timestamp_proto(connector.until), - pre_proc=_pre_proc_to_proto(connector.pre_proc), + pre_proc=_pre_proc_to_proto(dataset, connector.pre_proc), bounded=connector.bounded, idleness=( to_duration_proto(connector.idleness) @@ -1716,7 +1815,7 @@ def _kinesis_conn_to_source_proto( cdc=to_cdc_proto(connector.cdc), starting_from=_to_timestamp_proto(connector.since), until=_to_timestamp_proto(connector.until), - pre_proc=_pre_proc_to_proto(connector.pre_proc), + pre_proc=_pre_proc_to_proto(dataset, connector.pre_proc), bounded=connector.bounded, idleness=( to_duration_proto(connector.idleness) diff --git a/fennel/internal_lib/utils/utils.py b/fennel/internal_lib/utils/utils.py index 200bcbdc4..ea675a7a0 100644 --- a/fennel/internal_lib/utils/utils.py +++ b/fennel/internal_lib/utils/utils.py @@ -1,11 +1,14 @@ import dataclasses from datetime import datetime from decimal import Decimal -from typing import Any, Union +from typing import Any, Union, Dict +import numpy as np import pandas as pd +from frozendict import frozendict from fennel.gen.schema_pb2 import DataType +from fennel.internal_lib import FENNEL_STRUCT def _get_args(type_: Any) -> Any: @@ -124,3 +127,79 @@ def cast_col_to_pandas( ) else: return series.fillna(pd.NA) + + +def parse_struct_into_dict(value: Any) -> Union[dict, list]: + """ + This function assumes that there's a struct somewhere in the value that needs to be converted into json. + """ + if hasattr(value, FENNEL_STRUCT): + try: + return value.as_json() + except Exception as e: + raise TypeError( + f"Not able parse value: {value} into json, error: {e}" + ) + elif isinstance(value, list) or isinstance(value, np.ndarray): + return [parse_struct_into_dict(x) for x in value] + elif isinstance(value, dict) or isinstance(value, frozendict): + return {key: parse_struct_into_dict(val) for key, val in value.items()} + else: + return value + + +def parse_datetime_in_value( + value: Any, dtype: DataType, nullable: bool = False +) -> Any: + """ + This function assumes that there's a struct somewhere in the value that needs to be converted into json. + """ + if nullable: + try: + if not isinstance( + value, (list, tuple, dict, set, np.ndarray, frozendict) + ) and pd.isna(value): + return pd.NA + # ValueError error occurs when you do something like pd.isnull([1, 2, None]) + except ValueError: + pass + if dtype.HasField("optional_type"): + return parse_datetime_in_value(value, dtype.optional_type.of, True) + elif dtype.HasField("timestamp_type"): + return parse_datetime(value) + elif dtype.HasField("array_type"): + return [parse_datetime_in_value(x, dtype.array_type.of) for x in value] + elif dtype.HasField("map_type"): + if isinstance(value, (dict, frozendict)): + return { + key: parse_datetime_in_value(value, dtype.array_type.of) + for (key, value) in value.items() + } + elif isinstance(value, (list, np.ndarray)): + return [ + (key, parse_datetime_in_value(value, dtype.array_type.of)) + for (key, value) in value + ] + else: + return value + elif dtype.HasField("struct_type"): + if hasattr(value, FENNEL_STRUCT): + try: + value = value.as_json() + except Exception as e: + raise TypeError( + f"Not able parse value: {value} into json, error: {e}" + ) + output: Dict[Any, Any] = {} + for field in dtype.struct_type.fields: + dtype = field.dtype + name = field.name + if not dtype.HasField("optional_type") and name not in value: + raise ValueError( + f"value not found for non optional field : {field}" + ) + if name in value: + output[name] = parse_datetime_in_value(value[name], dtype) + return output + else: + return value diff --git a/fennel/lib/metadata/metadata.py b/fennel/lib/metadata/metadata.py index 7f9990763..23eadc728 100644 --- a/fennel/lib/metadata/metadata.py +++ b/fennel/lib/metadata/metadata.py @@ -20,10 +20,7 @@ import fennel.gen.metadata_pb2 as proto from fennel._vendor.pydantic import BaseModel, validator # type: ignore - -EMAIL_REGEX = r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$" -META_FIELD = "__fennel_metadata__" -OWNER = "__owner__" +from fennel.internal_lib import EMAIL_REGEX, META_FIELD, OWNER def desc(obj): diff --git a/fennel/testing/data_engine.py b/fennel/testing/data_engine.py index b89913878..e2ffde8bc 100644 --- a/fennel/testing/data_engine.py +++ b/fennel/testing/data_engine.py @@ -1,8 +1,7 @@ import copy import logging -import types import math - +import types from collections import defaultdict from dataclasses import dataclass from dataclasses import field @@ -31,16 +30,16 @@ Pipeline, Quantile, Stddev, - ExpDecaySum, ) from fennel.datasets.datasets import ( get_index, IndexDuration, ) +from fennel.expr.expr import Expr, TypedExpr from fennel.gen import schema_pb2 as schema_proto from fennel.gen.dataset_pb2 import CoreDataset from fennel.internal_lib.duration import Duration, duration_to_timedelta -from fennel.internal_lib.schema import data_schema_check +from fennel.internal_lib.schema import data_schema_check, get_datatype from fennel.internal_lib.to_proto import ( dataset_to_proto, get_dataset_core_code, @@ -59,6 +58,8 @@ FENNEL_LOOKUP, FENNEL_TIMESTAMP, FENNEL_ORDER, + cast_col_to_arrow_dtype, + cast_col_to_pandas_dtype, ) TEST_PORT = 50051 @@ -89,9 +90,17 @@ def internal_webhook_endpoint(dataset: Dataset): def _preproc_df( - df: pd.DataFrame, pre_proc: Dict[str, connectors.PreProcValue] + dataset: Dataset, + df: pd.DataFrame, + pre_proc: Dict[str, connectors.PreProcValue], ) -> pd.DataFrame: new_df = df.copy() + schema = dataset.schema() + # Remove the fields that are created in preproc + for col in pre_proc.keys(): + if col in schema: + del schema[col] + for col, pre_proc_value in pre_proc.items(): if isinstance(pre_proc_value, connectors.Ref): col_name = pre_proc_value.name @@ -100,6 +109,61 @@ def _preproc_df( f"Referenced column {col_name} not found in dataframe" ) new_df[col] = df[col_name] + elif isinstance(pre_proc_value, connectors.Eval): + input_schema = copy.deepcopy(schema) + + # try casting the column in dataframe according to schema defined in additional schema in eval preproc + if pre_proc_value.additional_schema: + for name, dtype in pre_proc_value.additional_schema.items(): + if name not in df.columns: + raise ValueError( + f"Field `{name}` defined in schema for eval preproc not found in dataframe." + ) + try: + new_df[name] = cast_col_to_arrow_dtype( + new_df[name], get_datatype(dtype) + ) + new_df[name] = cast_col_to_pandas_dtype( + new_df[name], get_datatype(dtype) + ) + except Exception as e: + raise ValueError( + f"Casting column in dataframe for field `{name}` defined in schema for eval preproc failed : {e}" + ) + input_schema[name] = dtype + + try: + if isinstance(pre_proc_value.eval_type, Expr): + new_df[col] = pre_proc_value.eval_type.eval( + new_df, input_schema + ) + elif isinstance(pre_proc_value.eval_type, TypedExpr): + new_df[col] = pre_proc_value.eval_type.expr.eval( + new_df, input_schema + ) + else: + assign_func_pycode = wrap_function( + dataset._name, + get_dataset_core_code(dataset), + "", + to_includes_proto(pre_proc_value.eval_type), + column_name=col, + is_assign=True, + ) + assign_df = new_df.copy() + mod = types.ModuleType(assign_func_pycode.entry_point) + code = ( + assign_func_pycode.imports + + "\n" + + assign_func_pycode.generated_code + ) + exec(code, mod.__dict__) + func = mod.__dict__[assign_func_pycode.entry_point] + new_df = func(assign_df) + except Exception as e: + raise Exception( + f"Error in assign preproc for field `{col}` for dataset `{dataset._name}`: {e} " + ) else: if isinstance(pre_proc_value, datetime): new_df[col] = parse_datetime(pre_proc_value) @@ -329,7 +393,9 @@ def log( len(preproc) == 1 ), f"Multiple preproc found for {ds} and {webhook_endpoint}" try: - df = _preproc_df(df, preproc[0]) + df = _preproc_df( + self.datasets[ds].dataset, df, preproc[0] + ) except ValueError as e: raise ValueError( f"Error using pre_proc for dataset `{ds}`: {str(e)}", diff --git a/fennel/testing/test_cast_df_to_schema.py b/fennel/testing/test_cast_df_to_schema.py index ab420cc66..1589e4492 100644 --- a/fennel/testing/test_cast_df_to_schema.py +++ b/fennel/testing/test_cast_df_to_schema.py @@ -11,12 +11,12 @@ from fennel.gen import schema_pb2 as schema_proto from fennel.internal_lib.schema import get_datatype from fennel.internal_lib.to_proto.to_proto import fields_to_dsschema +from fennel.internal_lib.utils.utils import parse_datetime_in_value from fennel.testing import mock from fennel.testing.test_utils import ( cast_df_to_schema, cast_col_to_arrow_dtype, cast_col_to_pandas_dtype, - parse_datetime_in_value, ) __owner__ = "nitin@fennel.ai" diff --git a/fennel/testing/test_utils.py b/fennel/testing/test_utils.py index 40951c832..74d9d606a 100644 --- a/fennel/testing/test_utils.py +++ b/fennel/testing/test_utils.py @@ -1,21 +1,20 @@ import json from math import isnan -from typing import Any, Union, List, Dict +from typing import Any, List import numpy as np import pandas as pd import pyarrow as pa -from frozendict import frozendict from google.protobuf.json_format import MessageToDict from fennel._vendor import jsondiff # type: ignore from fennel._vendor.requests import Response # type: ignore -from fennel.dtypes.dtypes import FENNEL_STRUCT, Window +from fennel.dtypes.dtypes import Window from fennel.gen.dataset_pb2 import Operator, Filter, Transform, Assign from fennel.gen.featureset_pb2 import Extractor from fennel.gen.pycode_pb2 import PyCode from fennel.gen.schema_pb2 import DSSchema, DataType, Field, TimestampType -from fennel.internal_lib.schema import convert_dtype_to_arrow_type, get_datatype +from fennel.internal_lib.schema import get_datatype, cast_col_to_arrow_dtype from fennel.internal_lib.utils import parse_datetime FENNEL_DELETE_TIMESTAMP = "__fennel_delete_timestamp__" @@ -110,112 +109,6 @@ def almost_equal(a: float, b: float, epsilon: float = 1e-6) -> bool: return abs(a - b) < epsilon -def check_dtype_has_struct_type(dtype: DataType) -> bool: - if dtype.HasField("struct_type"): - return True - elif dtype.HasField("optional_type"): - return check_dtype_has_struct_type(dtype.optional_type.of) - elif dtype.HasField("array_type"): - return check_dtype_has_struct_type(dtype.array_type.of) - elif dtype.HasField("map_type"): - return check_dtype_has_struct_type(dtype.map_type.value) - return False - - -def parse_struct_into_dict(value: Any) -> Union[dict, list]: - """ - This function assumes that there's a struct somewhere in the value that needs to be converted into json. - """ - if hasattr(value, FENNEL_STRUCT): - try: - return value.as_json() - except Exception as e: - raise TypeError( - f"Not able parse value: {value} into json, error: {e}" - ) - elif isinstance(value, list) or isinstance(value, np.ndarray): - return [parse_struct_into_dict(x) for x in value] - elif isinstance(value, dict) or isinstance(value, frozendict): - return {key: parse_struct_into_dict(val) for key, val in value.items()} - else: - return value - - -def parse_datetime_in_value( - value: Any, dtype: DataType, nullable: bool = False -) -> Any: - """ - This function assumes that there's a struct somewhere in the value that needs to be converted into json. - """ - if nullable: - try: - if not isinstance( - value, (list, tuple, dict, set, np.ndarray, frozendict) - ) and pd.isna(value): - return pd.NA - # ValueError error occurs when you do something like pd.isnull([1, 2, None]) - except ValueError: - pass - if dtype.HasField("optional_type"): - return parse_datetime_in_value(value, dtype.optional_type.of, True) - elif dtype.HasField("timestamp_type"): - return parse_datetime(value) - elif dtype.HasField("array_type"): - return [parse_datetime_in_value(x, dtype.array_type.of) for x in value] - elif dtype.HasField("map_type"): - if isinstance(value, (dict, frozendict)): - return { - key: parse_datetime_in_value(value, dtype.array_type.of) - for (key, value) in value.items() - } - elif isinstance(value, (list, np.ndarray)): - return [ - (key, parse_datetime_in_value(value, dtype.array_type.of)) - for (key, value) in value - ] - else: - return value - elif dtype.HasField("struct_type"): - if hasattr(value, FENNEL_STRUCT): - try: - value = value.as_json() - except Exception as e: - raise TypeError( - f"Not able parse value: {value} into json, error: {e}" - ) - output: Dict[Any, Any] = {} - for field in dtype.struct_type.fields: - dtype = field.dtype - name = field.name - if not dtype.HasField("optional_type") and name not in value: - raise ValueError( - f"value not found for non optional field : {field}" - ) - if name in value: - output[name] = parse_datetime_in_value(value[name], dtype) - return output - else: - return value - - -def cast_col_to_arrow_dtype(series: pd.Series, dtype: DataType) -> pd.Series: - """ - This function casts dtype of pd.Series object into pd.ArrowDtype depending on the DataType proto. - """ - if not dtype.HasField("optional_type"): - if series.isnull().any(): - raise ValueError("Null values found in non-optional field.") - - # Let's convert structs into json, this is done because arrow - # dtype conversion fails with fennel struct - if check_dtype_has_struct_type(dtype): - series = series.apply(lambda x: parse_struct_into_dict(x)) - # Parse datetime values - series = series.apply(lambda x: parse_datetime_in_value(x, dtype)) - arrow_type = convert_dtype_to_arrow_type(dtype) - return series.astype(pd.ArrowDtype(arrow_type)) - - def cast_col_to_pandas_dtype( series: pd.Series, dtype: DataType, nullable: bool = False ) -> pd.Series: @@ -397,8 +290,8 @@ def cast_df_to_schema( """ # Handle fields in keys and values fields = list(dsschema.keys.fields) + list(dsschema.values.fields) + columns = [field.name for field in fields] + [dsschema.timestamp] df = df.copy() - df = df.reset_index(drop=True) for f in fields: if f.name not in df.columns: raise ValueError( @@ -424,7 +317,7 @@ def cast_df_to_schema( raise ValueError( f"Failed to cast data logged to timestamp column {dsschema.timestamp}: {e}" ) - return df + return df[columns] def add_deletes( diff --git a/pyproject.toml b/pyproject.toml index da96109de..228e8a6ca 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fennel-ai" -version = "1.5.15" +version = "1.5.16" description = "The modern realtime feature engineering platform" authors = ["Fennel AI "] packages = [{ include = "fennel" }]