Skip to content

Commit

Permalink
added support for secrets manager (#557)
Browse files Browse the repository at this point in the history
* added support for secrets manager

* fixed lint and union type

* updated changelog

---------

Co-authored-by: Hemanth Kannekanti <[email protected]>
  • Loading branch information
hemanthk269 and Hemanth Kannekanti authored Sep 30, 2024
1 parent ffdf10e commit f4e5ace
Show file tree
Hide file tree
Showing 32 changed files with 2,254 additions and 632 deletions.
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [1.5.34] - 2024-09-30
- Add support for secrets manager in connectors.

## [1.5.33] - 2024-09-30
- Add support for now in expressions.

Expand Down
2 changes: 2 additions & 0 deletions fennel/_vendor/pydantic/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,8 @@ class IPv4NetworkError(PydanticValueError):
class IPv6NetworkError(PydanticValueError):
msg_template = 'value is not a valid IPv6 network'

class AWSSecretError(PydanticValueError):
msg_template = 'value is not a valid AWS secret'

class IPv4InterfaceError(PydanticValueError):
msg_template = 'value is not a valid IPv4 interface'
Expand Down
14 changes: 14 additions & 0 deletions fennel/_vendor/pydantic/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
is_typeddict,
)
from .utils import almost_equal_floats, lenient_issubclass, sequence_like
from ...integrations.aws import Secret

if TYPE_CHECKING:
from fennel._vendor.typing_extensions import Literal, TypedDict
Expand Down Expand Up @@ -252,6 +253,10 @@ def dict_validator(v: Any) -> Dict[Any, Any]:
if isinstance(v, dict):
return v

# without this check, we get a recursive loop which tries to convert secret to dict by calling __getitem__
if isinstance(v, Secret):
raise errors.DictError()

try:
return dict(v)
except (TypeError, ValueError):
Expand Down Expand Up @@ -413,6 +418,14 @@ def ip_v6_network_validator(v: Any) -> IPv6Network:
except ValueError:
raise errors.IPv6NetworkError()

def aws_secret_validator(v: Any) -> Secret:
if isinstance(v, Secret):
return v
try:
return Secret(v)
except ValueError:
raise errors.AWSSecretError()


def ip_v4_interface_validator(v: Any) -> IPv4Interface:
if isinstance(v, IPv4Interface):
Expand Down Expand Up @@ -692,6 +705,7 @@ def check(self, config: Type['BaseConfig']) -> bool:
(IPv6Address, [ip_v6_address_validator]),
(IPv4Network, [ip_v4_network_validator]),
(IPv6Network, [ip_v6_network_validator]),
(Secret, [aws_secret_validator]),
]


Expand Down
51 changes: 26 additions & 25 deletions fennel/connectors/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from fennel._vendor.pydantic import validator # type: ignore
from fennel.connectors.kinesis import at_timestamp
from fennel.expr.expr import Expr, TypedExpr
from fennel.integrations.aws import Secret
from fennel.internal_lib.duration import (
Duration,
)
Expand Down Expand Up @@ -325,8 +326,8 @@ def identifier(self) -> str:
class SQLSource(DataSource):
host: str
db_name: str
username: str
password: str
username: Union[str, Secret]
password: Union[str, Secret]
jdbc_params: Optional[str] = None
_get: bool = False

Expand All @@ -341,8 +342,8 @@ class CSV:


class S3(DataSource):
aws_access_key_id: Optional[str]
aws_secret_access_key: Optional[str]
aws_access_key_id: Optional[Union[str, Secret]]
aws_secret_access_key: Optional[Union[str, Secret]]
role_arn: Optional[str]

def bucket(
Expand Down Expand Up @@ -372,8 +373,8 @@ def get(name: str) -> S3:
return S3(
name=name,
_get=True,
aws_access_key_id="",
aws_secret_access_key="",
aws_access_key_id=None,
aws_secret_access_key=None,
role_arn=None,
)

Expand All @@ -384,7 +385,7 @@ def identifier(self) -> str:
class BigQuery(DataSource):
project_id: str
dataset_id: str
service_account_key: dict[str, str]
service_account_key: Union[dict[str, str], Secret]

def table(self, table_name: str, cursor: str) -> TableConnector:
return TableConnector(self, table_name, cursor)
Expand All @@ -409,25 +410,25 @@ def identifier(self) -> str:
class Avro(BaseModel):
registry: str
url: str
username: Optional[str]
password: Optional[str]
token: Optional[str]
username: Optional[Union[str, Secret]]
password: Optional[Union[str, Secret]]
token: Optional[Union[str, Secret]]


class Protobuf(BaseModel):
registry: str
url: str
username: Optional[str]
password: Optional[str]
token: Optional[str]
username: Optional[Union[str, Secret]]
password: Optional[Union[str, Secret]]
token: Optional[Union[str, Secret]]

def __init__(
self,
registry: str,
url: str,
username: Optional[str] = None,
password: Optional[str] = None,
token: Optional[str] = None,
username: Optional[Union[str, Secret]] = None,
password: Optional[Union[str, Secret]] = None,
token: Optional[Union[str, Secret]] = None,
):
if username or password:
if token is not None:
Expand All @@ -453,8 +454,8 @@ class Kafka(DataSource):
bootstrap_servers: str
security_protocol: Literal["PLAINTEXT", "SASL_PLAINTEXT", "SASL_SSL"]
sasl_mechanism: Literal["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512", "GSSAPI"]
sasl_plain_username: Optional[str]
sasl_plain_password: Optional[str]
sasl_plain_username: Optional[Union[str, Secret]]
sasl_plain_password: Optional[Union[str, Secret]]

def required_fields(self) -> List[str]:
return ["topic"]
Expand Down Expand Up @@ -525,8 +526,8 @@ def identifier(self) -> str:
class Snowflake(DataSource):
account: str
db_name: str
username: str
password: str
username: Union[str, Secret]
password: Union[str, Secret]
warehouse: str
src_schema: str = Field(alias="schema")
role: str
Expand Down Expand Up @@ -584,8 +585,8 @@ def required_fields(self) -> List[str]:

class Redshift(DataSource):
s3_access_role_arn: Optional[str]
username: Optional[str]
password: Optional[str]
username: Optional[Union[str, Secret]]
password: Optional[Union[str, Secret]]
db_name: str
host: str
port: int = 5439
Expand Down Expand Up @@ -617,8 +618,8 @@ def identifier(self) -> str:
class Mongo(DataSource):
host: str
db_name: str
username: str
password: str
username: Union[str, Secret]
password: Union[str, Secret]

def collection(self, collection_name: str, cursor: str) -> TableConnector:
return TableConnector(self, table_name=collection_name, cursor=cursor)
Expand All @@ -643,7 +644,7 @@ def identifier(self) -> str:

class PubSub(DataSource):
project_id: str
service_account_key: dict[str, str]
service_account_key: Union[dict[str, str], Secret]

def required_fields(self) -> List[str]:
return ["topic_id", "format"]
Expand Down
Loading

0 comments on commit f4e5ace

Please sign in to comment.