diff --git a/sdk/python/feast/expediagroup/pydantic_models/data_source_model.py b/sdk/python/feast/expediagroup/pydantic_models/data_source_model.py index 6aece6025b..ba17545593 100644 --- a/sdk/python/feast/expediagroup/pydantic_models/data_source_model.py +++ b/sdk/python/feast/expediagroup/pydantic_models/data_source_model.py @@ -12,7 +12,7 @@ from pydantic import Field as PydanticField from typing_extensions import Annotated, Self -from feast.data_source import KafkaSource, RequestSource +from feast.data_source import KafkaSource, PushSource, RequestSource from feast.expediagroup.pydantic_models.field_model import FieldModel from feast.expediagroup.pydantic_models.stream_format_model import ( AnyStreamFormat, @@ -170,6 +170,66 @@ def from_data_source( ] +SUPPORTED_PUSH_BATCH_SOURCES = [SparkSourceModel] + + +class PushSourceModel(DataSourceModel): + """ + Pydantic Model of a Feast PushSource. + """ + + name: str + model_type: Literal["PushSourceModel"] = "PushSourceModel" + batch_source: AnyBatchDataSource + description: Optional[str] = "" + tags: Optional[Dict[str, str]] = None + owner: Optional[str] = "" + + def to_data_source(self) -> PushSource: + """ + Given a Pydantic PushSourceModel, create and return a PushSource. + + Returns: + A SparkSource. + """ + return PushSource( + name=self.name, + batch_source=self.batch_source.to_data_source(), + description=self.description, + tags=self.tags, + owner=self.owner, + ) + + @classmethod + def from_data_source( + cls, + data_source, + ) -> Self: # type: ignore + """ + Converts a PushSource object to its pydantic model representation. + + Returns: + A PushSourceModel. + """ + class_ = getattr( + sys.modules[__name__], + type(data_source.batch_source).__name__ + "Model", + ) + if class_ not in SUPPORTED_PUSH_BATCH_SOURCES: + raise ValueError( + "Push Source's batch source type is not a supported data source type." + ) + batch_source = class_.from_data_source(data_source.batch_source) + + return cls( + name=data_source.name, + batch_source=batch_source, + description=data_source.description, + tags=data_source.tags, + owner=data_source.owner, + ) + + SUPPORTED_MESSAGE_FORMATS = [AvroFormatModel, JsonFormatModel, ProtoFormatModel] SUPPORTED_KAFKA_BATCH_SOURCES = [RequestSourceModel, SparkSourceModel] @@ -278,6 +338,6 @@ def from_data_source( # https://blog.devgenius.io/deserialize-child-classes-with-pydantic-that-gonna-work-784230e1cf83 # This lets us discriminate child classes of DataSourceModel with type hints. AnyDataSource = Annotated[ - Union[RequestSourceModel, SparkSourceModel, KafkaSourceModel], + Union[RequestSourceModel, SparkSourceModel, KafkaSourceModel, PushSourceModel], PydanticField(discriminator="model_type"), ] diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index a4c403dd49..2e3c5fac5c 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -11,7 +11,7 @@ from feast import usage from feast.base_feature_view import BaseFeatureView -from feast.data_source import DataSource, RequestSource +from feast.data_source import DataSource, PushSource, RequestSource from feast.entity import Entity from feast.errors import ( DataSourceObjectNotFoundException, @@ -21,6 +21,7 @@ ProjectMetadataNotFoundException, ) from feast.expediagroup.pydantic_models.data_source_model import ( + PushSourceModel, RequestSourceModel, SparkSourceModel, ) @@ -222,6 +223,10 @@ def apply_data_source( data = RequestSourceModel.from_data_source(data_source).json() response_data = self._send_request("PUT", url, params=params, data=data) return RequestSourceModel.parse_obj(response_data).to_data_source() + elif isinstance(data_source, PushSource): + data = PushSourceModel.from_data_source(data_source).json() + response_data = self._send_request("PUT", url, params=params, data=data) + return PushSourceModel.parse_obj(response_data).to_data_source() else: raise TypeError( "Unsupported DataSource type. Please use either SparkSource or RequestSource only" diff --git a/sdk/python/tests/unit/test_pydantic_models.py b/sdk/python/tests/unit/test_pydantic_models.py index e5a90f7c84..1ab11df385 100644 --- a/sdk/python/tests/unit/test_pydantic_models.py +++ b/sdk/python/tests/unit/test_pydantic_models.py @@ -19,11 +19,12 @@ from pydantic import BaseModel from feast.data_format import AvroFormat -from feast.data_source import KafkaSource, RequestSource +from feast.data_source import KafkaSource, PushSource, RequestSource from feast.entity import Entity from feast.expediagroup.pydantic_models.data_source_model import ( AnyDataSource, KafkaSourceModel, + PushSourceModel, RequestSourceModel, SparkSourceModel, ) @@ -209,6 +210,38 @@ def test_idempotent_sparksource_conversion(): assert pydantic_obj == SparkSourceModel.parse_obj(pydantic_json) +def test_idempotent_pushsource_conversion(): + spark_source = SparkSource( + name="spark-source", + table="thingy", + description="desc", + tags={}, + owner="feast", + ) + + python_obj = PushSource( + name="push-source", + batch_source=spark_source, + description="desc", + tags={}, + owner="feast", + ) + + pydantic_obj = PushSourceModel.from_data_source(python_obj) + converted_python_obj = pydantic_obj.to_data_source() + assert python_obj == converted_python_obj + + feast_proto = converted_python_obj.to_proto() + python_obj_from_proto = PushSource.from_proto(feast_proto) + assert python_obj == python_obj_from_proto + + pydantic_json = pydantic_obj.json() + assert pydantic_obj == PushSourceModel.parse_raw(pydantic_json) + + pydantic_json = pydantic_obj.dict() + assert pydantic_obj == PushSourceModel.parse_obj(pydantic_json) + + def test_idempotent_kafkasource_conversion(): schema = [ Field(name="f1", dtype=Float32),