Skip to content

Commit

Permalink
Add support for HTTP sink (#592)
Browse files Browse the repository at this point in the history
  • Loading branch information
saiharshavellanki authored Oct 28, 2024
1 parent 1ed7ac4 commit 3c9bf01
Show file tree
Hide file tree
Showing 16 changed files with 657 additions and 110 deletions.
1 change: 1 addition & 0 deletions .wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ hardcoded
hashable
hashmap
hasnull
healthz
hostname
html
hudi
Expand Down
1 change: 1 addition & 0 deletions docs/api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ sidebar:
- slug: "api-reference/sink_connectors"
title: "Sink Connectors"
pages:
- "api-reference/sink_connectors/http"
- "api-reference/sink_connectors/kafka"
- "api-reference/sink_connectors/s3"
- "api-reference/sink_connectors/snowflake"
Expand Down
76 changes: 76 additions & 0 deletions docs/examples/api-reference/sinks/http_sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import os
from datetime import datetime

from fennel.testing import mock

__owner__ = "[email protected]"


@mock
def test_http_sink(client):
os.environ["KAFKA_USERNAME"] = "test"
os.environ["KAFKA_PASSWORD"] = "test"
os.environ["SNOWFLAKE_USERNAME"] = "some-name"
os.environ["SNOWFLAKE_PASSWORD"] = "some-password"
os.environ["DB_NAME"] = "some-db-name"

from fennel.connectors import source, Kafka
from fennel.datasets import dataset, field

kafka = Kafka(
name="my_kafka",
bootstrap_servers="localhost:9092", # could come via os env var too
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="PLAIN",
sasl_plain_username=os.environ["KAFKA_USERNAME"],
sasl_plain_password=os.environ["KAFKA_PASSWORD"],
)

# docsnip-highlight next-line
@source(kafka.topic("user", format="json"), disorder="14d", cdc="upsert")
@dataset
class SomeDataset:
uid: int = field(key=True)
email: str
timestamp: datetime

from fennel.connectors import source, Kafka
from fennel.datasets import dataset, field, pipeline, Dataset
from fennel.lib.params import inputs

# docsnip basic
from fennel.connectors import sink, HTTP

# docsnip-highlight start
http = HTTP(
name="http",
host="http://http-echo-server.harsha.svc.cluster.local:8081/",
healthz="/health",
)
# docsnip-highlight end

@dataset
# docsnip-highlight start
@sink(
http.path(endpoint="/sink", limit=1000, headers={"Foo": "Bar"}),
cdc="debezium",
how="incremental",
)
# docsnip-highlight end
class SomeDatasetFiltered:
uid: int = field(key=True)
email: str
timestamp: datetime

@pipeline
@inputs(SomeDataset)
def gmail_filtered(cls, dataset: Dataset):
return dataset.filter(
lambda row: row["email"].contains("gmail.com")
)

# /docsnip

client.commit(
message="some commit msg", datasets=[SomeDataset, SomeDatasetFiltered]
)
4 changes: 3 additions & 1 deletion docs/examples/api-reference/sinks/snowflake_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ class SomeDataset:
from fennel.connectors import sink

@dataset
# docsnip-highlight start
@sink(
snowflake.table("test_table"),
every="1d",
how="incremental",
renames={"uid": "new_uid"},
) # docsnip-highlight
)
# docsnip-highlight end
class SomeDatasetFiltered:
uid: int = field(key=True)
email: str
Expand Down
58 changes: 58 additions & 0 deletions docs/pages/api-reference/sink_connectors/http.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
---
title: HTTP
order: 0
status: published
---
### HTTP
Data sink to HTTP endpoints.

#### Connector Parameters
<Expandable title="name" type="str">
A name to identify the sink. The name should be unique across all Fennel connectors.
</Expandable>

<Expandable title="host" type="str|Secret">
The HTTP host URL. Example: https://127.0.0.1:8081
</Expandable>

<Expandable title="healthz" type="str">
The health check endpoint to verify the server's availability.
</Expandable>

#### HTTP Path Parameters
<Expandable title="endpoint" type="str">
The specific endpoint where data will be sent
</Expandable>

<Expandable title="limit" type="Optional[int]">
The number of records to include in each request to the endpoint. Default: 100
</Expandable>

<Expandable title="headers" type="Dict[str,str]">
A map of headers to include with each request
</Expandable>


<pre snippet="api-reference/sinks/http_sink#basic"
status="success" message="HTTP sink">
</pre>

#### Errors
<Expandable title="Connectivity Issues">
Fennel tries to test the connection with your HTTP sink during `commit` itself
using the health check endpoint

Note: Mock client can not talk to any external data sink and hence is unable to
do this validation at commit time.
</Expandable>

:::info
- HTTP sink ensures at least once delivery. To handle duplicates, use
`["payload"]["source"]["fennel"]["partition"]` and `["payload"]["source"]["fennel"]["offset"]`
fields in the output.
:::





3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [1.5.44] - 2024-10-24
- Add support for HTTP Sink

## [1.5.43] - 2024-10-23
- Add support for keyless S3 and Snowflake sinks

Expand Down
2 changes: 2 additions & 0 deletions fennel/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
SINK_FIELD,
DataConnector,
DataSource,
HTTP,
HTTPConnector,
S3Connector,
KafkaConnector,
TableConnector,
Expand Down
95 changes: 89 additions & 6 deletions fennel/connectors/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ def source(
f"{', '.join(conn.required_fields())}."
)

if isinstance(conn, HTTPConnector):
raise TypeError("HTTP Source is not supported yet")

if (
isinstance(conn, S3Connector)
and conn.format == "delta"
Expand Down Expand Up @@ -267,20 +270,35 @@ def sink(
and not isinstance(conn.data_source, Snowflake)
)
)
and not isinstance(conn, HTTPConnector)
):
raise ValueError(
"Sink is only supported for Kafka, S3 and Snowflake, found %s"
"Sink is only supported for Kafka, S3, Snowflake and HTTP, found %s"
% type(conn),
)

if isinstance(conn, KafkaConnector):
if cdc != "debezium":
raise ValueError('Sink only support "debezium" cdc, found %s' % cdc)

raise ValueError(
'Kafka Sink only support "debezium" cdc, found %s' % cdc
)
if conn.format != "json":
raise ValueError(
'Sink only support "json" format for now, found %s' % cdc
'Kafka Sink only support "json" format for now, found %s' % cdc
)
if every:
raise ValueError('"every" should not be set for Kafka sink')
if how is not None and how != "incremental":
raise ValueError(
'Only "incremental" style supported for Kafka sink'
)
if renames and len(renames) > 0:
raise ValueError("Renames are not supported for Kafka sink")
if since or until:
raise ValueError(
'"since" and "until" are not supported for HTTP sink'
)

if isinstance(conn, S3Connector):
if cdc:
raise ValueError("CDC shouldn't be set for S3 sink")
Expand Down Expand Up @@ -310,20 +328,40 @@ def sink(
"Only Incremental style supported for Snowflake sink"
)

if isinstance(conn, HTTPConnector):
if cdc != "debezium":
raise ValueError(
'HTTP Sink only support "debezium" cdc, found %s' % cdc
)
if every:
raise ValueError('"every" should not be set for HTTP sink')
if how is not None and how != "incremental":
raise ValueError('Only "incremental" style supported for HTTP sink')
if renames and len(renames) > 0:
raise ValueError("Renames are not supported for HTTP sink")
if since or until:
raise ValueError(
'"since" and "until" are not supported for HTTP sink'
)

def decorator(dataset_cls: T):
conn.cdc = cdc
conn.every = every
conn.how = how
conn.renames = renames
# Always pass this as True for now
conn.create = True
conn.since = since
conn.until = until
conn.envs = EnvSelector(env)
connectors = getattr(dataset_cls, SINK_FIELD, [])
connectors.append(conn)
setattr(dataset_cls, SINK_FIELD, connectors)

# Always set create as False for realtime sinks and True for batch sinks
if isinstance(conn, KafkaConnector) or isinstance(conn, HTTPConnector):
conn.create = False
else:
conn.create = True

return dataset_cls

return decorator
Expand Down Expand Up @@ -722,6 +760,26 @@ def identifier(self) -> str:
return f"[PubSub: {self.name}]"


class HTTP(DataSource):
host: Union[str, Secret]
healthz: str

def required_fields(self) -> List[str]:
return ["endpoint"]

def path(
self,
endpoint: str,
limit: Optional[int] = None,
headers: Dict[str, str] = dict(),
) -> HTTPConnector:
return HTTPConnector(self, endpoint, limit, headers)

@staticmethod
def get(name: str) -> HTTP:
return HTTP(name=name, _get=True, host="", healthz="")


# ------------------------------------------------------------------------------
# DataConnector
# ------------------------------------------------------------------------------
Expand Down Expand Up @@ -994,6 +1052,31 @@ def identifier(self) -> str:
return f"{self.data_source.identifier()}(topic={self.topic_id}, format={self.format})"


class HTTPConnector(DataConnector):
"""
HTTP is a DataConnector that pushes data to customer's HTTP endpoint
"""

endpoint: str
limit: Optional[int]
headers: Optional[Dict[str, str]]

def __init__(
self,
data_source: DataSource,
endpoint: str,
limit: Optional[int],
headers: Optional[Dict[str, str]],
):
self.data_source = data_source
self.endpoint = endpoint
self.limit = limit
self.headers = headers

def identifier(self) -> str:
return f"{self.data_source.identifier()}(endpoint={self.endpoint})"


def is_table_source(con: DataConnector) -> bool:
if isinstance(
con,
Expand Down
Loading

0 comments on commit 3c9bf01

Please sign in to comment.