diff --git a/.gitignore b/.gitignore index e4e82bfce4..4fadcbb6cc 100644 --- a/.gitignore +++ b/.gitignore @@ -223,4 +223,8 @@ ui/.vercel **/yarn-error.log* # Go subprocess binaries (built during feast pip package building) -sdk/python/feast/binaries/ \ No newline at end of file +sdk/python/feast/binaries/ +/sdk/python/feast/open_api/ +/sdk/python/feast/test_open_api/ + +venv39/* \ No newline at end of file diff --git a/protos/feast/core/FeatureView.proto b/protos/feast/core/FeatureView.proto index c9e38bf344..f4351b6d07 100644 --- a/protos/feast/core/FeatureView.proto +++ b/protos/feast/core/FeatureView.proto @@ -26,6 +26,7 @@ import "google/protobuf/duration.proto"; import "google/protobuf/timestamp.proto"; import "feast/core/DataSource.proto"; import "feast/core/Feature.proto"; +import "feast/core/Entity.proto"; message FeatureView { // User-specified specifications of this feature view. @@ -75,6 +76,11 @@ message FeatureViewSpec { // Whether these features should be served online or not bool online = 8; + + // User-specified specifications of this entity. + // Adding higher index to avoid conflicts in future + // if Feast adds more fields + repeated Entity original_entities = 30; } message FeatureViewMeta { @@ -91,4 +97,4 @@ message FeatureViewMeta { message MaterializationInterval { google.protobuf.Timestamp start_time = 1; google.protobuf.Timestamp end_time = 2; -} +} \ No newline at end of file diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 9097e40c94..45fd608fa9 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -66,6 +66,11 @@ def __init__(self, name, project=None): super().__init__(f"On demand feature view {name} does not exist") +class ProjectMetadataNotFoundException(FeastObjectNotFoundException): + def __init__(self, project: str = None): + super().__init__(f"Project Metadata does not exist in project {project}") + + class RequestDataNotFoundInEntityDfException(FeastObjectNotFoundException): def __init__(self, feature_name, feature_view_name): super().__init__( diff --git a/sdk/python/feast/expediagroup/pydantic_models/data_source_model.py b/sdk/python/feast/expediagroup/pydantic_models/data_source_model.py index 83419af6fb..31f6c1239e 100644 --- a/sdk/python/feast/expediagroup/pydantic_models/data_source_model.py +++ b/sdk/python/feast/expediagroup/pydantic_models/data_source_model.py @@ -8,10 +8,10 @@ from pydantic import BaseModel from pydantic import Field as PydanticField -from typing_extensions import Annotated +from typing_extensions import Annotated, Self from feast.data_source import RequestSource -from feast.field import Field +from feast.expediagroup.pydantic_models.field_model import FieldModel from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( SparkSource, ) @@ -49,41 +49,31 @@ class RequestSourceModel(DataSourceModel): name: str model_type: Literal["RequestSourceModel"] = "RequestSourceModel" - schema_: List[Field] = PydanticField(None, alias="schema") + schema_: List[FieldModel] description: Optional[str] = "" tags: Optional[Dict[str, str]] = None owner: Optional[str] = "" - class Config: - arbitrary_types_allowed = True - extra = "allow" - - def to_data_source(self): + def to_data_source(self) -> RequestSource: """ Given a Pydantic RequestSourceModel, create and return a RequestSource. Returns: A RequestSource. """ - params = { - "name": self.name, - "description": self.description, - "tags": self.tags if self.tags else None, - "owner": self.owner, - } - params["schema"] = [ - Field( - name=sch.name, - dtype=sch.dtype, - description=sch.description, - tags=sch.tags, - ) - for sch in self.schema_ - ] - return RequestSource(**params) + return RequestSource( + name=self.name, + schema=[sch.to_field() for sch in self.schema_], + description=self.description, + tags=self.tags, + owner=self.owner, + ) @classmethod - def from_data_source(cls, data_source): + def from_data_source( + cls, + data_source, + ) -> Self: # type: ignore """ Converts a RequestSource object to its pydantic model representation. @@ -92,7 +82,9 @@ def from_data_source(cls, data_source): """ return cls( name=data_source.name, - schema=data_source.schema, + schema_=[ + FieldModel.from_field(ds_schema) for ds_schema in data_source.schema + ], description=data_source.description, tags=data_source.tags if data_source.tags else None, owner=data_source.owner, @@ -117,11 +109,7 @@ class SparkSourceModel(DataSourceModel): owner: Optional[str] = "" timestamp_field: Optional[str] = None - class Config: - arbitrary_types_allowed = True - extra = "allow" - - def to_data_source(self): + def to_data_source(self) -> SparkSource: """ Given a Pydantic SparkSourceModel, create and return a SparkSource. @@ -130,24 +118,23 @@ def to_data_source(self): """ return SparkSource( name=self.name, - table=self.table if hasattr(self, "table") else "", - query=self.query if hasattr(self, "query") else "", - path=self.path if hasattr(self, "path") else "", - file_format=self.file_format if hasattr(self, "file_format") else "", - created_timestamp_column=self.created_timestamp_column - if hasattr(self, "created_timestamp_column") - else "", - field_mapping=self.field_mapping if self.field_mapping else None, - description=self.description or "", - tags=self.tags if self.tags else None, - owner=self.owner or "", - timestamp_field=self.timestamp_field - if hasattr(self, "timestamp_field") - else "", + table=self.table, + query=self.query, + path=self.path, + file_format=self.file_format, + created_timestamp_column=self.created_timestamp_column, + field_mapping=self.field_mapping, + description=self.description, + tags=self.tags, + owner=self.owner, + timestamp_field=self.timestamp_field, ) @classmethod - def from_data_source(cls, data_source): + def from_data_source( + cls, + data_source, + ) -> Self: # type: ignore """ Converts a SparkSource object to its pydantic model representation. @@ -160,18 +147,12 @@ def from_data_source(cls, data_source): query=data_source.query, path=data_source.path, file_format=data_source.file_format, - created_timestamp_column=data_source.created_timestamp_column - if data_source.created_timestamp_column - else "", - field_mapping=data_source.field_mapping - if data_source.field_mapping - else None, - description=data_source.description if data_source.description else "", - tags=data_source.tags if data_source.tags else None, - owner=data_source.owner if data_source.owner else "", - timestamp_field=data_source.timestamp_field - if data_source.timestamp_field - else "", + created_timestamp_column=data_source.created_timestamp_column, + field_mapping=data_source.field_mapping, + description=data_source.description, + tags=data_source.tags, + owner=data_source.owner, + timestamp_field=data_source.timestamp_field, ) diff --git a/sdk/python/feast/expediagroup/pydantic_models/entity_model.py b/sdk/python/feast/expediagroup/pydantic_models/entity_model.py index e46e65924d..0d1e754cb7 100644 --- a/sdk/python/feast/expediagroup/pydantic_models/entity_model.py +++ b/sdk/python/feast/expediagroup/pydantic_models/entity_model.py @@ -10,6 +10,7 @@ from typing import Callable, Dict, Optional from pydantic import BaseModel +from typing_extensions import Self from feast.entity import Entity from feast.value_type import ValueType @@ -36,7 +37,7 @@ class Config: ValueType: lambda v: int(dumps(v.value, default=str)) } - def to_entity(self): + def to_entity(self) -> Entity: """ Given a Pydantic EntityModel, create and return an Entity. @@ -51,12 +52,15 @@ def to_entity(self): tags=self.tags if self.tags else None, owner=self.owner, ) - entity.created_timestamp = (self.created_timestamp,) + entity.created_timestamp = self.created_timestamp entity.last_updated_timestamp = self.last_updated_timestamp return entity @classmethod - def from_entity(cls, entity): + def from_entity( + cls, + entity, + ) -> Self: # type: ignore """ Converts an entity object to its pydantic model representation. diff --git a/sdk/python/feast/expediagroup/pydantic_models/feature_service.py b/sdk/python/feast/expediagroup/pydantic_models/feature_service.py new file mode 100644 index 0000000000..2e4893afb7 --- /dev/null +++ b/sdk/python/feast/expediagroup/pydantic_models/feature_service.py @@ -0,0 +1,81 @@ +import sys +from datetime import datetime +from typing import Dict, List, Optional, Union + +from pydantic import BaseModel +from typing_extensions import Self + +from feast.expediagroup.pydantic_models.feature_view_model import ( + FeatureViewModel, + FeatureViewProjectionModel, + OnDemandFeatureViewModel, +) +from feast.feature_service import FeatureService + + +class FeatureServiceModel(BaseModel): + """ + Pydantic model for Feast FeatureService + """ + + name: str + features: List[Union[FeatureViewModel, OnDemandFeatureViewModel]] + feature_view_projections: List[FeatureViewProjectionModel] + description: str + tags: Dict[str, str] + owner: str + created_timestamp: Optional[datetime] + last_updated_timestamp: Optional[datetime] + # TODO: logging_config option is not supported temporarily. + # we will add this fucntionality to FeatureServiceModel in future. + # logging_config: Optional[LoggingConfig] = None + + def to_feature_service(self) -> FeatureService: + fs = FeatureService( + name=self.name, + features=[feature.to_feature_view() for feature in self.features], + description=self.description, + tags=self.tags, + owner=self.owner, + ) + + fs.feature_view_projections = [ + feature_view_projection.to_feature_view_projection() + for feature_view_projection in self.feature_view_projections + ] + fs.created_timestamp = self.created_timestamp + fs.last_updated_timestamp = self.last_updated_timestamp + + return fs + + @classmethod + def from_feature_service( + cls, + feature_service: FeatureService, + ) -> Self: # type: ignore + + features = [] + for feature in feature_service._features: + class_ = getattr( + sys.modules[__name__], + type(feature).__name__ + "Model", + ) + features.append(class_.from_feature_view(feature)) + + feature_view_projections = [ + FeatureViewProjectionModel.from_feature_view_projection( + feature_view_projection + ) + for feature_view_projection in feature_service.feature_view_projections + ] + + return cls( + name=feature_service.name, + features=features, + feature_view_projections=feature_view_projections, + description=feature_service.description, + tags=feature_service.tags, + owner=feature_service.owner, + created_timestamp=feature_service.created_timestamp, + last_updated_timestamp=feature_service.last_updated_timestamp, + ) diff --git a/sdk/python/feast/expediagroup/pydantic_models/feature_view_model.py b/sdk/python/feast/expediagroup/pydantic_models/feature_view_model.py index f9b99e4b3e..058485dcb3 100644 --- a/sdk/python/feast/expediagroup/pydantic_models/feature_view_model.py +++ b/sdk/python/feast/expediagroup/pydantic_models/feature_view_model.py @@ -5,55 +5,70 @@ Author: matcarlin@expediagroup.com """ import sys -from datetime import timedelta -from json import dumps -from typing import Callable, Dict, List, Optional +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Tuple +import dill from pydantic import BaseModel +from typing_extensions import Self -from feast.data_source import DataSource -from feast.entity import Entity from feast.expediagroup.pydantic_models.data_source_model import ( AnyDataSource, RequestSourceModel, SparkSourceModel, ) from feast.expediagroup.pydantic_models.entity_model import EntityModel +from feast.expediagroup.pydantic_models.field_model import FieldModel from feast.feature_view import FeatureView -from feast.field import Field -from feast.types import ComplexFeastType, PrimitiveFeastType +from feast.feature_view_projection import FeatureViewProjection +from feast.on_demand_feature_view import OnDemandFeatureView SUPPORTED_DATA_SOURCES = [RequestSourceModel, SparkSourceModel] -class FeatureViewModel(BaseModel): +class BaseFeatureViewModel(BaseModel): + """ + Pydantic Model of a Feast BaseFeatureView. + """ + + def to_feature_view(self): + """ + Given a Pydantic BaseFeatureViewModel, create and return a FeatureView. + + Returns: + A FeatureView. + """ + raise NotImplementedError + + @classmethod + def from_feature_view(cls, feature_view): + """ + Converts a FeatureView object to its pydantic model representation. + + Returns: + A BaseFeatureViewModel. + """ + raise NotImplementedError + + +class FeatureViewModel(BaseFeatureViewModel): """ Pydantic Model of a Feast FeatureView. """ name: str original_entities: List[EntityModel] = [] - original_schema: Optional[List[Field]] = None + original_schema: Optional[List[FieldModel]] ttl: Optional[timedelta] batch_source: AnyDataSource stream_source: Optional[AnyDataSource] - online: bool = True - description: str = "" - tags: Optional[Dict[str, str]] = None - owner: str = "" - - class Config: - arbitrary_types_allowed = True - extra = "allow" - json_encoders: Dict[object, Callable] = { - Field: lambda v: int(dumps(v.value, default=str)), - DataSource: lambda v: v.to_pydantic_model(), - Entity: lambda v: v.to_pydantic_model(), - ComplexFeastType: lambda v: str(v), - PrimitiveFeastType: lambda v: str(v), - } + online: bool + description: str + tags: Optional[Dict[str, str]] + owner: str + materialization_intervals: List[Tuple[datetime, datetime]] = [] - def to_feature_view(self): + def to_feature_view(self) -> FeatureView: """ Given a Pydantic FeatureViewModel, create and return a FeatureView. @@ -78,7 +93,9 @@ def to_feature_view(self): feature_view = FeatureView( name=self.name, source=source, - schema=self.original_schema, + schema=[sch.to_field() for sch in self.original_schema] + if self.original_schema is not None + else None, entities=[entity.to_entity() for entity in self.original_entities], ttl=self.ttl, online=self.online, @@ -86,11 +103,15 @@ def to_feature_view(self): tags=self.tags if self.tags else None, owner=self.owner, ) + feature_view.materialization_intervals = self.materialization_intervals return feature_view @classmethod - def from_feature_view(cls, feature_view): + def from_feature_view( + cls, + feature_view: FeatureView, + ) -> Self: # type: ignore """ Converts a FeatureView object to its pydantic model representation. @@ -126,11 +147,140 @@ def from_feature_view(cls, feature_view): for entity in feature_view.original_entities ], ttl=feature_view.ttl, - original_schema=feature_view.original_schema, + original_schema=[ + FieldModel.from_field(fv_schema) + for fv_schema in feature_view.original_schema + ] + if feature_view.original_schema is not None + else None, batch_source=batch_source, stream_source=stream_source, online=feature_view.online, description=feature_view.description, tags=feature_view.tags if feature_view.tags else None, owner=feature_view.owner, + materialization_intervals=feature_view.materialization_intervals, + ) + + +class FeatureViewProjectionModel(BaseModel): + """ + Pydantic Model of a Feast FeatureViewProjection. + """ + + name: str + name_alias: Optional[str] + features: List[FieldModel] + desired_features: List[str] + join_key_map: Dict[str, str] + + def to_feature_view_projection(self) -> FeatureViewProjection: + return FeatureViewProjection( + name=self.name, + name_alias=self.name_alias, + desired_features=self.desired_features, + features=[sch.to_field() for sch in self.features], + join_key_map=self.join_key_map, + ) + + @classmethod + def from_feature_view_projection( + cls, + feature_view_projection: FeatureViewProjection, + ) -> Self: # type: ignore + return cls( + name=feature_view_projection.name, + name_alias=feature_view_projection.name_alias, + desired_features=feature_view_projection.desired_features, + features=[ + FieldModel.from_field(feature) + for feature in feature_view_projection.features + ], + join_key_map=feature_view_projection.join_key_map, + ) + + +class OnDemandFeatureViewModel(BaseFeatureViewModel): + """ + Pydantic Model of a Feast OnDemandFeatureView. + """ + + name: str + features: List[FieldModel] + source_feature_view_projections: Dict[str, FeatureViewProjectionModel] + source_request_sources: Dict[str, RequestSourceModel] + udf: str + udf_string: str + description: str + tags: Dict[str, str] + owner: str + + def to_feature_view(self) -> OnDemandFeatureView: + source_request_sources = dict() + if self.source_request_sources: + for key, feature_view_projection in self.source_request_sources.items(): + source_request_sources[key] = feature_view_projection.to_data_source() + + source_feature_view_projections = dict() + if self.source_feature_view_projections: + for ( + key, + feature_view_projection, + ) in self.source_feature_view_projections.items(): + source_feature_view_projections[ + key + ] = feature_view_projection.to_feature_view_projection() + + return OnDemandFeatureView( + name=self.name, + schema=[sch.to_field() for sch in self.features], + sources=list(source_feature_view_projections.values()) + + list(source_request_sources.values()), + udf=dill.loads(bytes.fromhex(self.udf)), + udf_string=self.udf_string, + description=self.description, + tags=self.tags, + owner=self.owner, + ) + + @classmethod + def from_feature_view( + cls, + on_demand_feature_view: OnDemandFeatureView, + ) -> Self: # type: ignore + source_request_sources = dict() + if on_demand_feature_view.source_request_sources: + for ( + key, + req_data_source, + ) in on_demand_feature_view.source_request_sources.items(): + source_request_sources[key] = RequestSourceModel.from_data_source( + req_data_source + ) + + source_feature_view_projections = dict() + if on_demand_feature_view.source_feature_view_projections: + for ( + key, + feature_view_projection, + ) in on_demand_feature_view.source_feature_view_projections.items(): + source_feature_view_projections[ + key + ] = FeatureViewProjectionModel.from_feature_view_projection( + feature_view_projection + ) + + return cls( + name=on_demand_feature_view.name, + features=[ + FieldModel.from_field(feature) + for feature in on_demand_feature_view.features + ], + source_feature_view_projections=source_feature_view_projections, + source_request_sources=source_request_sources, + udf=dill.dumps(on_demand_feature_view.udf, recurse=True).hex(), + udf_string=on_demand_feature_view.udf_string, + description=on_demand_feature_view.description, + tags=on_demand_feature_view.tags, + owner=on_demand_feature_view.owner, ) diff --git a/sdk/python/feast/expediagroup/pydantic_models/field_model.py b/sdk/python/feast/expediagroup/pydantic_models/field_model.py new file mode 100644 index 0000000000..d6d7a31a3b --- /dev/null +++ b/sdk/python/feast/expediagroup/pydantic_models/field_model.py @@ -0,0 +1,50 @@ +from typing import Dict, Optional, Union + +from pydantic import BaseModel +from typing_extensions import Self + +from feast import Field +from feast.types import Array, PrimitiveFeastType + + +class FieldModel(BaseModel): + """ + Pydantic Model of a Feast Field. + """ + + name: str + dtype: Union[Array, PrimitiveFeastType] + description: str = "" + tags: Optional[Dict[str, str]] = {} + + def to_field(self) -> Field: + """ + Given a Pydantic FieldModel, create and return a Field. + + Returns: + A Field. + """ + return Field( + name=self.name, + dtype=self.dtype, + description=self.description, + tags=self.tags, + ) + + @classmethod + def from_field( + cls, + field: Field, + ) -> Self: # type: ignore + """ + Converts a Field object to its pydantic FieldModel representation. + + Returns: + A FieldModel. + """ + return cls( + name=field.name, + dtype=field.dtype, + description=field.description, + tags=field.tags, + ) diff --git a/sdk/python/feast/expediagroup/pydantic_models/project_metadata_model.py b/sdk/python/feast/expediagroup/pydantic_models/project_metadata_model.py new file mode 100644 index 0000000000..5a535f3662 --- /dev/null +++ b/sdk/python/feast/expediagroup/pydantic_models/project_metadata_model.py @@ -0,0 +1,44 @@ +from datetime import datetime + +from pydantic import BaseModel +from typing_extensions import Self + +from feast.project_metadata import ProjectMetadata + + +class ProjectMetadataModel(BaseModel): + """ + Pydantic Model of a Feast Field. + """ + + project_name: str + project_uuid: str = "" + last_updated_timestamp: datetime = datetime.min + + def to_project_metadata(self) -> ProjectMetadata: + """ + Given a Pydantic ProjectMetadataModel, create and return a ProjectMetadata. + + Returns: + A ProjectMetadata. + """ + return ProjectMetadata( + project_name=self.project_name, + project_uuid=self.project_uuid, + ) + + @classmethod + def from_project_metadata( + cls, + project_metadata: ProjectMetadata, + ) -> Self: # type: ignore + """ + Converts a ProjectMetadata object to its pydantic ProjectMetadataModel representation. + + Returns: + A ProjectMetadataModel. + """ + return cls( + project_name=project_metadata.project_name, + project_uuid=project_metadata.project_uuid, + ) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 6797aade8c..28e04d59cd 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -25,6 +25,7 @@ from feast.entity import Entity from feast.feature_view_projection import FeatureViewProjection from feast.field import Field +from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto from feast.protos.feast.core.FeatureView_pb2 import ( FeatureViewMeta as FeatureViewMetaProto, @@ -139,7 +140,7 @@ def __init__( # making it impossible to convert idempotently to another format. # store these arguments to recover them in conversions. self.original_schema = schema - self.original_entities = entities or [] + self.original_entities: List[Entity] = entities or [] schema = schema or [] @@ -349,6 +350,10 @@ def to_proto(self) -> FeatureViewProto: stream_source_proto = self.stream_source.to_proto() stream_source_proto.data_source_class_type = f"{self.stream_source.__class__.__module__}.{self.stream_source.__class__.__name__}" + original_entities: List[EntityProto] = [] + for entity in self.original_entities: + original_entities.append(entity.to_proto()) + spec = FeatureViewSpecProto( name=self.name, entities=self.entities, @@ -361,6 +366,7 @@ def to_proto(self) -> FeatureViewProto: online=self.online, batch_source=batch_source_proto, stream_source=stream_source_proto, + original_entities=original_entities, ) return FeatureViewProto(spec=spec, meta=meta) @@ -420,24 +426,31 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): # This avoids the deprecation warning. feature_view.entities = list(feature_view_proto.spec.entities) - feature_view.original_entities = feature_view_proto.spec.entities + feature_view.original_entities = [ + Entity.from_proto(entity) + for entity in feature_view_proto.spec.original_entities + ] # Instead of passing in a schema, we set the features and entity columns. feature_view.features = [ Field.from_proto(field_proto) for field_proto in feature_view_proto.spec.features ] + feature_view.entity_columns = [ Field.from_proto(field_proto) for field_proto in feature_view_proto.spec.entity_columns ] - if len(feature_view.entities) != len(feature_view.entity_columns): warnings.warn( f"There are some mismatches in your feature view's registered entities. Please check if you have applied your entities correctly." f"Entities: {feature_view.entities} vs Entity Columns: {feature_view.entity_columns}" ) + feature_view.original_schema = ( + feature_view.entity_columns + feature_view.features + ) + # FeatureViewProjections are not saved in the FeatureView proto. # Create the default projection. feature_view.projection = FeatureViewProjection.from_definition(feature_view) diff --git a/sdk/python/feast/feature_view_projection.py b/sdk/python/feast/feature_view_projection.py index 2960996a10..a570612b9f 100644 --- a/sdk/python/feast/feature_view_projection.py +++ b/sdk/python/feast/feature_view_projection.py @@ -53,7 +53,10 @@ def to_proto(self) -> FeatureViewProjectionProto: def from_proto(proto: FeatureViewProjectionProto): feature_view_projection = FeatureViewProjection( name=proto.feature_view_name, - name_alias=proto.feature_view_name_alias, + # TODO: Its a temporary fix to support conversion from feast models to pydantic models after its registered in Registry + name_alias=proto.feature_view_name_alias + if proto.feature_view_name_alias != "" + else None, features=[], join_key_map=dict(proto.join_key_map), desired_features=[], diff --git a/sdk/python/feast/field.py b/sdk/python/feast/field.py index b2f6a5d250..c33ef03c59 100644 --- a/sdk/python/feast/field.py +++ b/sdk/python/feast/field.py @@ -12,20 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Callable, Dict, Optional +from typing import Dict, Optional from pydantic import BaseModel, validator from typeguard import check_type, typechecked from feast.feature import Feature from feast.protos.feast.core.Feature_pb2 import FeatureSpecV2 as FieldProto -from feast.types import ( - ComplexFeastType, - FeastType, - PrimitiveFeastType, - from_string, - from_value_type, -) +from feast.types import FeastType, from_string, from_value_type from feast.value_type import ValueType @@ -49,10 +43,6 @@ class Field(BaseModel): class Config: arbitrary_types_allowed = True extra = "allow" - json_encoders: Dict[object, Callable] = { - ComplexFeastType: lambda v: str(v), - PrimitiveFeastType: lambda v: str(v), - } @validator("dtype", pre=True, always=True) def dtype_is_feasttype_or_string_feasttype(cls, v): diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 54ff7c9dc8..1f263d9f9d 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -4,7 +4,7 @@ from enum import Enum from pathlib import Path from threading import Lock -from typing import Any, Callable, List, Optional, Set, Union +from typing import Any, Callable, Dict, List, Optional, Set, Union from pydantic import StrictStr from sqlalchemy import ( # type: ignore @@ -34,6 +34,9 @@ SavedDatasetNotFound, ValidationReferenceNotFound, ) +from feast.expediagroup.pydantic_models.project_metadata_model import ( + ProjectMetadataModel, +) from feast.feature_service import FeatureService from feast.feature_view import FeatureView from feast.infra.infra_object import Infra @@ -1080,3 +1083,56 @@ def _get_all_projects(self) -> Set[str]: projects.add(row["project_id"]) return projects + + def get_all_project_metadata(self) -> List[ProjectMetadataModel]: + """ + Returns all projects metdata. No supporting function in SQL Registry so implemented this here instead of _get_all_projects. + """ + project_metadata_model_dict: Dict[str, ProjectMetadataModel] = {} + with self.engine.connect() as conn: + stmt = select(feast_metadata) + rows = conn.execute(stmt).all() + if rows: + for row in rows: + project_id = row["project_id"] + metadata_key = row["metadata_key"] + metadata_value = row["metadata_value"] + + if project_id not in project_metadata_model_dict: + project_metadata_model_dict[project_id] = ProjectMetadataModel( + project_name=project_id + ) + + project_metadata_model: ProjectMetadataModel = ( + project_metadata_model_dict[project_id] + ) + if metadata_key == FeastMetadataKeys.PROJECT_UUID.value: + project_metadata_model.project_uuid = metadata_value + + if metadata_key == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value: + project_metadata_model.last_updated_timestamp = metadata_value + return list(project_metadata_model_dict.values()) + + def get_project_metadata(self, project: str) -> ProjectMetadataModel: + """ + Returns given project metdata. No supporting function in SQL Registry so implemented this here rather than using _get_last_updated_metadata and list_project_metadata. + """ + project_metadata_model: ProjectMetadataModel = ProjectMetadataModel( + project_name=project + ) + with self.engine.connect() as conn: + stmt = select(feast_metadata).where( + feast_metadata.c.project_id == project, + ) + rows = conn.execute(stmt).all() + if rows: + for row in rows: + metadata_key = row["metadata_key"] + metadata_value = row["metadata_value"] + + if metadata_key == FeastMetadataKeys.PROJECT_UUID.value: + project_metadata_model.project_uuid = metadata_value + + if metadata_key == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value: + project_metadata_model.last_updated_timestamp = metadata_value + return project_metadata_model diff --git a/sdk/python/feast/types.py b/sdk/python/feast/types.py index 3b4196c05b..9b930d58b6 100644 --- a/sdk/python/feast/types.py +++ b/sdk/python/feast/types.py @@ -15,7 +15,7 @@ from enum import Enum from typing import Dict, Union -from pydantic import BaseModel, validator +from pydantic import BaseModel from feast.value_type import ValueType @@ -103,18 +103,6 @@ def __hash__(self): UnixTimestamp = PrimitiveFeastType.UNIX_TIMESTAMP -SUPPORTED_BASE_TYPES = [ - Invalid, - String, - Bytes, - Bool, - Int32, - Int64, - Float32, - Float64, - UnixTimestamp, -] - PRIMITIVE_FEAST_TYPES_TO_STRING = { "INVALID": "Invalid", "STRING": "String", @@ -136,27 +124,11 @@ class Array(ComplexFeastType): base_type: The base type of the array. """ - base_type: Union[PrimitiveFeastType, ComplexFeastType] - - @validator("base_type", pre=True, always=True) - def base_type_is_supported(cls, base_type): - if base_type not in SUPPORTED_BASE_TYPES: - raise ValueError( - f"Type {type(base_type)} is currently not supported as a base type for Array." - ) - return base_type + base_type: PrimitiveFeastType - def __init__(self, base_type: Union[PrimitiveFeastType, ComplexFeastType]): + def __init__(self, base_type: PrimitiveFeastType): super(Array, self).__init__(base_type=base_type) - # def __init__(self, base_type: Union[PrimitiveFeastType, ComplexFeastType]): - # if base_type not in SUPPORTED_BASE_TYPES: - # raise ValueError( - # f"Type {type(base_type)} is currently not supported as a base type for Array." - # ) - - # self.base_type = base_type - def to_value_type(self) -> ValueType: assert isinstance(self.base_type, PrimitiveFeastType) value_type_name = PRIMITIVE_FEAST_TYPES_TO_VALUE_TYPES[self.base_type.name] diff --git a/sdk/python/tests/unit/test_pydantic_models.py b/sdk/python/tests/unit/test_pydantic_models.py index 8b84dd3d5a..2ceef000df 100644 --- a/sdk/python/tests/unit/test_pydantic_models.py +++ b/sdk/python/tests/unit/test_pydantic_models.py @@ -11,8 +11,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import uuid +from datetime import datetime, timedelta from typing import List +import pandas as pd from pydantic import BaseModel from feast.data_source import RequestSource @@ -23,13 +26,61 @@ SparkSourceModel, ) from feast.expediagroup.pydantic_models.entity_model import EntityModel -from feast.expediagroup.pydantic_models.feature_view_model import FeatureViewModel +from feast.expediagroup.pydantic_models.feature_service import FeatureServiceModel +from feast.expediagroup.pydantic_models.feature_view_model import ( + FeatureViewModel, + FeatureViewProjectionModel, + OnDemandFeatureViewModel, +) +from feast.expediagroup.pydantic_models.field_model import FieldModel +from feast.expediagroup.pydantic_models.project_metadata_model import ( + ProjectMetadataModel, +) +from feast.feature_service import FeatureService from feast.feature_view import FeatureView +from feast.feature_view_projection import FeatureViewProjection from feast.field import Field from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( SparkSource, ) -from feast.types import Bool, Float32 +from feast.on_demand_feature_view import OnDemandFeatureView, on_demand_feature_view +from feast.project_metadata import ProjectMetadata +from feast.types import Array, Bool, Float32, Float64, Int64, PrimitiveFeastType, String +from feast.value_type import ValueType + + +def test_idempotent_field_primitive_type_conversion(): + python_obj = Field(name="val_to_add", dtype=Int64) + pydantic_obj = FieldModel.from_field(python_obj) + converted_python_obj = pydantic_obj.to_field() + assert python_obj == converted_python_obj + + feast_proto = converted_python_obj.to_proto() + python_obj_from_proto = Field.from_proto(feast_proto) + assert python_obj == python_obj_from_proto + + pydantic_json = pydantic_obj.json() + assert pydantic_obj == FieldModel.parse_raw(pydantic_json) + + pydantic_dict = pydantic_obj.dict() + assert pydantic_obj == FieldModel.parse_obj(pydantic_dict) + + +def test_idempotent_field_complex_array_type_conversion(): + python_obj = Field(name="val_to_add", dtype=Array(Int64)) + pydantic_obj = FieldModel.from_field(python_obj) + converted_python_obj = pydantic_obj.to_field() + assert python_obj == converted_python_obj + + feast_proto = converted_python_obj.to_proto() + python_obj_from_proto = Field.from_proto(feast_proto) + assert python_obj == python_obj_from_proto + + pydantic_json = pydantic_obj.json() + assert pydantic_obj == FieldModel.parse_raw(pydantic_json) + + pydantic_dict = pydantic_obj.dict() + assert pydantic_obj == FieldModel.parse_obj(pydantic_dict) def test_datasource_child_deserialization(): @@ -59,7 +110,14 @@ class Config: request_source_model_json = { "name": "source", "model_type": "RequestSourceModel", - "schema": [{"name": "string", "dtype": "Int32", "description": "", "tags": {}}], + "schema_": [ + { + "name": "string", + "dtype": PrimitiveFeastType.INT64, + "description": "", + "tags": {}, + } + ], "description": "desc", "tags": {}, "owner": "feast", @@ -78,14 +136,25 @@ class Config: def test_idempotent_entity_conversion(): - entity = Entity( + python_obj = Entity( name="my-entity", description="My entity", + value_type=ValueType.INT64, tags={"key1": "val1", "key2": "val2"}, ) - entity_model = EntityModel.from_entity(entity) - entity_b = entity_model.to_entity() - assert entity == entity_b + pydantic_obj = EntityModel.from_entity(python_obj) + converted_python_obj = pydantic_obj.to_entity() + assert python_obj == converted_python_obj + + feast_proto = converted_python_obj.to_proto() + python_obj_from_proto = Entity.from_proto(feast_proto) + assert python_obj == python_obj_from_proto + + pydantic_json = pydantic_obj.json() + assert pydantic_obj == EntityModel.parse_raw(pydantic_json) + + pydantic_dict = pydantic_obj.dict() + assert pydantic_obj == EntityModel.parse_obj(pydantic_dict) def test_idempotent_requestsource_conversion(): @@ -93,29 +162,49 @@ def test_idempotent_requestsource_conversion(): Field(name="f1", dtype=Float32), Field(name="f2", dtype=Bool), ] - request_source = RequestSource( + python_obj = RequestSource( name="source", schema=schema, description="desc", tags={}, owner="feast", ) - request_source_model = RequestSourceModel.from_data_source(request_source) - request_source_b = request_source_model.to_data_source() - assert request_source == request_source_b + pydantic_obj = RequestSourceModel.from_data_source(python_obj) + converted_python_obj = pydantic_obj.to_data_source() + assert python_obj == converted_python_obj + + feast_proto = converted_python_obj.to_proto() + python_obj_from_proto = RequestSource.from_proto(feast_proto) + assert python_obj == python_obj_from_proto + + pydantic_json = pydantic_obj.json() + assert pydantic_obj == RequestSourceModel.parse_raw(pydantic_json) + + pydantic_json = pydantic_obj.dict() + assert pydantic_obj == RequestSourceModel.parse_obj(pydantic_json) def test_idempotent_sparksource_conversion(): - spark_source = SparkSource( + python_obj = SparkSource( name="source", table="thingy", description="desc", tags={}, owner="feast", ) - spark_source_model = SparkSourceModel.from_data_source(spark_source) - spark_source_b = spark_source_model.to_data_source() - assert spark_source == spark_source_b + pydantic_obj = SparkSourceModel.from_data_source(python_obj) + converted_python_obj = pydantic_obj.to_data_source() + assert python_obj == converted_python_obj + + feast_proto = converted_python_obj.to_proto() + python_obj_from_proto = SparkSource.from_proto(feast_proto) + assert python_obj == python_obj_from_proto + + pydantic_json = pydantic_obj.json() + assert pydantic_obj == SparkSourceModel.parse_raw(pydantic_json) + + pydantic_json = pydantic_obj.dict() + assert pydantic_obj == SparkSourceModel.parse_obj(pydantic_json) def test_idempotent_featureview_conversion(): @@ -123,7 +212,9 @@ def test_idempotent_featureview_conversion(): Field(name="f1", dtype=Float32), Field(name="f2", dtype=Bool), ] - user_entity = Entity(name="user1", join_keys=["user_id"]) + user_entity = Entity( + name="user1", join_keys=["user_id"], value_type=ValueType.INT64 + ) request_source = RequestSource( name="source", schema=schema, @@ -135,10 +226,12 @@ def test_idempotent_featureview_conversion(): name="my-feature-view", entities=[user_entity], schema=[ + Field(name="user1", dtype=Int64), Field(name="feature1", dtype=Float32), Field(name="feature2", dtype=Float32), ], source=request_source, + ttl=timedelta(days=0), ) feature_view_model = FeatureViewModel.from_feature_view(feature_view) feature_view_b = feature_view_model.to_feature_view() @@ -151,7 +244,7 @@ def test_idempotent_featureview_conversion(): timestamp_field="event_timestamp", created_timestamp_column="created", ) - feature_view = FeatureView( + python_obj = FeatureView( name="my-feature-view", entities=[user_entity], schema=[ @@ -160,6 +253,307 @@ def test_idempotent_featureview_conversion(): ], source=spark_source, ) + python_obj.materialization_intervals = [ + (datetime.now() - timedelta(days=10), datetime.now() - timedelta(days=9)), + (datetime.now(), datetime.now()), + ] + pydantic_obj = FeatureViewModel.from_feature_view(python_obj) + converted_python_obj = pydantic_obj.to_feature_view() + assert python_obj == converted_python_obj + + feast_proto = converted_python_obj.to_proto() + python_obj_from_proto = FeatureView.from_proto(feast_proto) + assert python_obj == python_obj_from_proto + + pydantic_json = pydantic_obj.json() + assert pydantic_obj == FeatureViewModel.parse_raw(pydantic_json) + + pydantic_json = pydantic_obj.dict() + assert pydantic_obj == FeatureViewModel.parse_obj(pydantic_json) + + +def test_idempotent_feature_view_projection_conversion(): + # Feast not using desired_features while converting to proto + python_obj = FeatureViewProjection( + name="example_projection", + name_alias="alias", + desired_features=[], + features=[ + Field(name="feature1", dtype=Float64), + Field(name="feature2", dtype=String), + ], + join_key_map={"old_key": "new_key"}, + ) + pydantic_obj = FeatureViewProjectionModel.from_feature_view_projection(python_obj) + converted_python_obj = pydantic_obj.to_feature_view_projection() + assert python_obj == converted_python_obj + + feast_proto = converted_python_obj.to_proto() + python_obj_from_proto = FeatureViewProjection.from_proto(feast_proto) + assert python_obj == python_obj_from_proto + + pydantic_json = pydantic_obj.json() + assert pydantic_obj == FeatureViewProjectionModel.parse_raw(pydantic_json) + + pydantic_json = pydantic_obj.dict() + assert pydantic_obj == FeatureViewProjectionModel.parse_obj(pydantic_json) + + +def test_idempotent_on_demand_feature_view_conversion(): + tags = { + "tag1": "val1", + "tag2": "val2", + "tag3": "val3", + } + + """ + Entity is a collection of semantically related features. + """ + entity1: Entity = Entity( + name="entity1", + description="entity1", + value_type=ValueType.INT64, + owner="x@xyz.com", + tags=tags, + ) + + entity2: Entity = Entity( + name="entity2", + description="entity2", + value_type=ValueType.INT64, + owner="x@zyz.com", + tags=tags, + ) + + """ + Data source refers to raw features data that users own. Feature Store + does not manage any of the raw underlying data but instead, oversees + loading this data and performing different operations on + the data to retrieve or serve features. + + Feast uses a time-series data model to represent data. + """ + + datasource1: SparkSource = SparkSource( + name="datasource1", + description="datasource1", + query="""select entity1 + , val1 + , val2 + , val3 + , val4 + , val5 + , CURRENT_DATE AS event_timestamp + from table1 + WHERE entity1 < 100000""", + timestamp_field="event_timestamp", + tags=tags, + owner="x@xyz.com", + ) + + datasource2: SparkSource = SparkSource( + name="datasource2", + description="datasource2", + path="s3a://test-bucket/path1/datasource2", + file_format="parquet", + timestamp_field="event_timestamp", + tags=tags, + owner="x@xyz.com", + ) + + """ + A feature view is an object that represents a logical group + of time-series feature data as it is found in a data source. + """ + + view1: FeatureView = FeatureView( + name="view1", + entities=[entity1], + ttl=timedelta(days=365), + source=datasource1, + tags=tags, + description="view1", + owner="x@xyz.com", + schema=[ + Field(name="val1", dtype=String), + Field(name="val2", dtype=String), + Field(name="val3", dtype=Float64), + Field(name="val4", dtype=Float64), + Field(name="val5", dtype=String), + ], + ) + + view2: FeatureView = FeatureView( + name="view2", + entities=[entity2], + ttl=timedelta(days=365), + source=datasource2, + tags=tags, + description="view2", + owner="x@xyz.com", + schema=[ + Field(name="r1", dtype=Float64), + Field(name="r2", dtype=Float64), + Field(name="r3", dtype=String), + ], + ) + + distance_decorator = on_demand_feature_view( + sources=[view1, view2], + schema=[Field(name="distance_in_kms", dtype=Float64)], + ) + + def calculate_distance_demo_go(features_df: pd.DataFrame) -> pd.DataFrame: + import numpy as np + + df = pd.DataFrame() + # Haversine formula + # Radius of earth in kilometers. Use 3956 for miles + r = 6371 + + # calculate the result + df["distance_in_kms"] = ( + 2 + * np.arcsin( + np.sqrt( + np.sin( + ( + np.radians(features_df["val3"]) + - np.radians(features_df["r1"]) + ) + / 2 + ) + ** 2 + + np.cos(np.radians(features_df["r1"])) + * np.cos(np.radians(features_df["val3"])) + * np.sin( + ( + np.radians(features_df["val4"]) + - np.radians(features_df["r2"]) + ) + / 2 + ) + ** 2 + ) + ) + * r + ) + + return df + + python_obj = distance_decorator(calculate_distance_demo_go) + pydantic_obj = OnDemandFeatureViewModel.from_feature_view(python_obj) + converted_python_obj = pydantic_obj.to_feature_view() + assert python_obj == converted_python_obj + + feast_proto = converted_python_obj.to_proto() + python_obj_from_proto = OnDemandFeatureView.from_proto(feast_proto) + assert python_obj == python_obj_from_proto + + pydantic_json = pydantic_obj.json() + assert pydantic_obj == OnDemandFeatureViewModel.parse_raw(pydantic_json) + + pydantic_json = pydantic_obj.dict() + assert pydantic_obj == OnDemandFeatureViewModel.parse_obj(pydantic_json) + + +def test_idempotent_feature_service_conversion(): + schema = [ + Field(name="f1", dtype=Float32), + Field(name="f2", dtype=Bool), + ] + user_entity = Entity(name="user1", join_keys=["user_id"]) + request_source = RequestSource( + name="source", + schema=schema, + description="desc", + tags={}, + owner="feast", + ) + feature_view = FeatureView( + name="my-feature-view", + entities=[user_entity], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=request_source, + ttl=timedelta(days=0), + ) feature_view_model = FeatureViewModel.from_feature_view(feature_view) feature_view_b = feature_view_model.to_feature_view() assert feature_view == feature_view_b + + spark_source = SparkSource( + name="sparky_sparky_boom_man", + path="/data/driver_hourly_stats", + file_format="parquet", + timestamp_field="event_timestamp", + created_timestamp_column="created", + ) + view1 = FeatureView( + name="my-feature-view1", + entities=[user_entity], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=spark_source, + ) + + spark_source_2 = SparkSource( + name="sparky_sparky_boom_man2", + path="/data/driver_hourly_stats", + file_format="parquet", + timestamp_field="event_timestamp", + created_timestamp_column="created", + ) + view2 = FeatureView( + name="my-feature-view2", + entities=[user_entity], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=spark_source_2, + ) + + python_obj = FeatureService( + name="feature_service_1", + description="Helps to retrieve features from view1 and view2", + owner="bdodla@expediagroup.com", + tags={}, + features=[view1, view2[["feature1"]]], + ) + pydantic_obj = FeatureServiceModel.from_feature_service(python_obj) + converted_python_obj = pydantic_obj.to_feature_service() + assert python_obj == converted_python_obj + + feast_proto = converted_python_obj.to_proto() + python_obj_from_proto = FeatureService.from_proto(feast_proto) + assert python_obj == python_obj_from_proto + + pydantic_json = pydantic_obj.json() + assert pydantic_obj == FeatureServiceModel.parse_raw(pydantic_json) + + pydantic_json = pydantic_obj.dict() + assert pydantic_obj == FeatureServiceModel.parse_obj(pydantic_json) + + +def test_idempotent_project_metadata_conversion(): + python_obj = ProjectMetadata( + project_name="test_project", project_uuid=f"{uuid.uuid4()}" + ) + pydantic_obj = ProjectMetadataModel.from_project_metadata(python_obj) + converted_python_obj = pydantic_obj.to_project_metadata() + assert python_obj == converted_python_obj + + feast_proto = converted_python_obj.to_proto() + python_obj_from_proto = ProjectMetadata.from_proto(feast_proto) + assert python_obj == python_obj_from_proto + + pydantic_json = pydantic_obj.json(exclude={"last_updated_timestamp"}) + assert pydantic_obj == ProjectMetadataModel.parse_raw(pydantic_json) + + pydantic_json = pydantic_obj.dict(exclude={"last_updated_timestamp"}) + assert pydantic_obj == ProjectMetadataModel.parse_obj(pydantic_json) diff --git a/sdk/python/tests/unit/test_types.py b/sdk/python/tests/unit/test_types.py index af490b4f3a..a45e83e218 100644 --- a/sdk/python/tests/unit/test_types.py +++ b/sdk/python/tests/unit/test_types.py @@ -1,4 +1,5 @@ import pytest +from pydantic.error_wrappers import ValidationError from feast.types import Array, Float32, String, from_value_type from feast.value_type import ValueType @@ -20,10 +21,10 @@ def test_array_feast_type(): assert array_float_32.to_value_type() == ValueType.FLOAT_LIST assert from_value_type(array_float_32.to_value_type()) == array_float_32 - with pytest.raises(ValueError): + with pytest.raises(ValidationError): _ = Array(Array) - with pytest.raises(ValueError): + with pytest.raises(ValidationError): _ = Array(Array(String))