Skip to content

Commit

Permalink
Merge pull request #74 from ExpediaGroup/feature/push_source_pydantic…
Browse files Browse the repository at this point in the history
…_model

feat: push source pydantic model
  • Loading branch information
michaelbackes authored Jan 11, 2024
2 parents d248fd7 + b3593f0 commit 02968df
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 4 deletions.
64 changes: 62 additions & 2 deletions sdk/python/feast/expediagroup/pydantic_models/data_source_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pydantic import Field as PydanticField
from typing_extensions import Annotated, Self

from feast.data_source import KafkaSource, RequestSource
from feast.data_source import KafkaSource, PushSource, RequestSource
from feast.expediagroup.pydantic_models.field_model import FieldModel
from feast.expediagroup.pydantic_models.stream_format_model import (
AnyStreamFormat,
Expand Down Expand Up @@ -170,6 +170,66 @@ def from_data_source(
]


SUPPORTED_PUSH_BATCH_SOURCES = [SparkSourceModel]


class PushSourceModel(DataSourceModel):
"""
Pydantic Model of a Feast PushSource.
"""

name: str
model_type: Literal["PushSourceModel"] = "PushSourceModel"
batch_source: AnyBatchDataSource
description: Optional[str] = ""
tags: Optional[Dict[str, str]] = None
owner: Optional[str] = ""

def to_data_source(self) -> PushSource:
"""
Given a Pydantic PushSourceModel, create and return a PushSource.
Returns:
A SparkSource.
"""
return PushSource(
name=self.name,
batch_source=self.batch_source.to_data_source(),
description=self.description,
tags=self.tags,
owner=self.owner,
)

@classmethod
def from_data_source(
cls,
data_source,
) -> Self: # type: ignore
"""
Converts a PushSource object to its pydantic model representation.
Returns:
A PushSourceModel.
"""
class_ = getattr(
sys.modules[__name__],
type(data_source.batch_source).__name__ + "Model",
)
if class_ not in SUPPORTED_PUSH_BATCH_SOURCES:
raise ValueError(
"Push Source's batch source type is not a supported data source type."
)
batch_source = class_.from_data_source(data_source.batch_source)

return cls(
name=data_source.name,
batch_source=batch_source,
description=data_source.description,
tags=data_source.tags,
owner=data_source.owner,
)


SUPPORTED_MESSAGE_FORMATS = [AvroFormatModel, JsonFormatModel, ProtoFormatModel]
SUPPORTED_KAFKA_BATCH_SOURCES = [RequestSourceModel, SparkSourceModel]

Expand Down Expand Up @@ -278,6 +338,6 @@ def from_data_source(
# https://blog.devgenius.io/deserialize-child-classes-with-pydantic-that-gonna-work-784230e1cf83
# This lets us discriminate child classes of DataSourceModel with type hints.
AnyDataSource = Annotated[
Union[RequestSourceModel, SparkSourceModel, KafkaSourceModel],
Union[RequestSourceModel, SparkSourceModel, KafkaSourceModel, PushSourceModel],
PydanticField(discriminator="model_type"),
]
7 changes: 6 additions & 1 deletion sdk/python/feast/infra/registry/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from feast import usage
from feast.base_feature_view import BaseFeatureView
from feast.data_source import DataSource, RequestSource
from feast.data_source import DataSource, PushSource, RequestSource
from feast.entity import Entity
from feast.errors import (
DataSourceObjectNotFoundException,
Expand All @@ -21,6 +21,7 @@
ProjectMetadataNotFoundException,
)
from feast.expediagroup.pydantic_models.data_source_model import (
PushSourceModel,
RequestSourceModel,
SparkSourceModel,
)
Expand Down Expand Up @@ -222,6 +223,10 @@ def apply_data_source(
data = RequestSourceModel.from_data_source(data_source).json()
response_data = self._send_request("PUT", url, params=params, data=data)
return RequestSourceModel.parse_obj(response_data).to_data_source()
elif isinstance(data_source, PushSource):
data = PushSourceModel.from_data_source(data_source).json()
response_data = self._send_request("PUT", url, params=params, data=data)
return PushSourceModel.parse_obj(response_data).to_data_source()
else:
raise TypeError(
"Unsupported DataSource type. Please use either SparkSource or RequestSource only"
Expand Down
35 changes: 34 additions & 1 deletion sdk/python/tests/unit/test_pydantic_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
from pydantic import BaseModel

from feast.data_format import AvroFormat
from feast.data_source import KafkaSource, RequestSource
from feast.data_source import KafkaSource, PushSource, RequestSource
from feast.entity import Entity
from feast.expediagroup.pydantic_models.data_source_model import (
AnyDataSource,
KafkaSourceModel,
PushSourceModel,
RequestSourceModel,
SparkSourceModel,
)
Expand Down Expand Up @@ -209,6 +210,38 @@ def test_idempotent_sparksource_conversion():
assert pydantic_obj == SparkSourceModel.parse_obj(pydantic_json)


def test_idempotent_pushsource_conversion():
spark_source = SparkSource(
name="spark-source",
table="thingy",
description="desc",
tags={},
owner="feast",
)

python_obj = PushSource(
name="push-source",
batch_source=spark_source,
description="desc",
tags={},
owner="feast",
)

pydantic_obj = PushSourceModel.from_data_source(python_obj)
converted_python_obj = pydantic_obj.to_data_source()
assert python_obj == converted_python_obj

feast_proto = converted_python_obj.to_proto()
python_obj_from_proto = PushSource.from_proto(feast_proto)
assert python_obj == python_obj_from_proto

pydantic_json = pydantic_obj.json()
assert pydantic_obj == PushSourceModel.parse_raw(pydantic_json)

pydantic_json = pydantic_obj.dict()
assert pydantic_obj == PushSourceModel.parse_obj(pydantic_json)


def test_idempotent_kafkasource_conversion():
schema = [
Field(name="f1", dtype=Float32),
Expand Down

0 comments on commit 02968df

Please sign in to comment.