diff --git a/go/embedded/online_features.go b/go/embedded/online_features.go index 8b4b0953ac..66187d9a00 100644 --- a/go/embedded/online_features.go +++ b/go/embedded/online_features.go @@ -27,6 +27,7 @@ import ( "github.com/feast-dev/feast/go/protos/feast/serving" prototypes "github.com/feast-dev/feast/go/protos/feast/types" "github.com/feast-dev/feast/go/types" + jsonlog "github.com/rs/zerolog/log" "google.golang.org/grpc/health/grpc_health_v1" ) @@ -65,6 +66,7 @@ type LoggingOptions struct { func NewOnlineFeatureService(conf *OnlineFeatureServiceConfig, transformationCallback transformation.TransformationCallback) *OnlineFeatureService { repoConfig, err := registry.NewRepoConfigFromJSON(conf.RepoPath, conf.RepoConfig) if err != nil { + jsonlog.Error().Stack().Err(err).Msg("Failed to convert to RepoConfig") return &OnlineFeatureService{ err: err, } @@ -72,6 +74,7 @@ func NewOnlineFeatureService(conf *OnlineFeatureServiceConfig, transformationCal fs, err := feast.NewFeatureStore(repoConfig, transformationCallback) if err != nil { + jsonlog.Error().Stack().Err(err).Msg("Failed to create NewFeatureStore") return &OnlineFeatureService{ err: err, } diff --git a/go/internal/feast/featurestore.go b/go/internal/feast/featurestore.go index 4aa7870350..00389feb78 100644 --- a/go/internal/feast/featurestore.go +++ b/go/internal/feast/featurestore.go @@ -3,7 +3,6 @@ package feast import ( "context" "errors" - "fmt" "github.com/apache/arrow/go/v8/arrow/memory" @@ -53,7 +52,6 @@ func NewFeatureStore(config *registry.RepoConfig, callback transformation.Transf } err = registry.InitializeRegistry() if err != nil { - fmt.Println("ERROR: Unable to Initialize Registry: ", err) return nil, err } diff --git a/go/internal/feast/registry/registry.go b/go/internal/feast/registry/registry.go index 1f7bbbaab7..4d6896153d 100644 --- a/go/internal/feast/registry/registry.go +++ b/go/internal/feast/registry/registry.go @@ -8,6 +8,7 @@ import ( "time" "github.com/feast-dev/feast/go/internal/feast/model" + "github.com/rs/zerolog/log" "github.com/feast-dev/feast/go/protos/feast/core" ) @@ -70,7 +71,7 @@ func (r *Registry) InitializeRegistry() error { _, err := r.getRegistryProto() if err != nil { if _, ok := r.registryStore.(*HttpRegistryStore); ok { - fmt.Printf("[%s] %s %s\n", time.UTC.String(), "ERROR: Registry Initialization Failed: ", err) + log.Error().Err(err).Msg("Registry Initialization Failed") return err } registryProto := &core.Registry{RegistrySchemaVersion: REGISTRY_SCHEMA_VERSION} @@ -85,7 +86,7 @@ func (r *Registry) RefreshRegistryOnInterval() { for ; true; <-ticker.C { err := r.refresh() if err != nil { - fmt.Printf("[%s] %s %s\n", time.UTC.String(), "ERROR: Failed to refresh Registry: ", err) + log.Error().Stack().Err(err).Msg("Registry refresh Failed") return } } diff --git a/go/internal/feast/server/http_server.go b/go/internal/feast/server/http_server.go index 451b16dd52..0d2e57437a 100644 --- a/go/internal/feast/server/http_server.go +++ b/go/internal/feast/server/http_server.go @@ -322,6 +322,7 @@ func (s *httpServer) Serve(host string, port int) error { if err == http.ErrServerClosed { return nil } + log.Error().Stack().Err(err).Msg("Startup failed") return err } diff --git a/protos/feast/core/Registry.proto b/protos/feast/core/Registry.proto index 7d80d8c837..1a21ba37cb 100644 --- a/protos/feast/core/Registry.proto +++ b/protos/feast/core/Registry.proto @@ -58,4 +58,6 @@ message Registry { message ProjectMetadata { string project = 1; string project_uuid = 2; + google.protobuf.Timestamp last_updated_timestamp = 3; + } diff --git a/sdk/python/feast/expediagroup/pydantic_models/project_metadata_model.py b/sdk/python/feast/expediagroup/pydantic_models/project_metadata_model.py index bd589f50e9..fab4bd0a82 100644 --- a/sdk/python/feast/expediagroup/pydantic_models/project_metadata_model.py +++ b/sdk/python/feast/expediagroup/pydantic_models/project_metadata_model.py @@ -25,6 +25,7 @@ def to_project_metadata(self) -> ProjectMetadata: return ProjectMetadata( project_name=self.project_name, project_uuid=self.project_uuid, + last_updated_timestamp=self.last_updated_timestamp, ) @classmethod @@ -41,4 +42,5 @@ def from_project_metadata( return cls( project_name=project_metadata.project_name, project_uuid=project_metadata.project_uuid, + last_updated_timestamp=project_metadata.last_updated_timestamp, ) diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index 03e0ef5955..12b5db5fc2 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -90,7 +90,7 @@ def __init__( if registry_config.cache_ttl_seconds is not None else 0 ) - self.cached_registry_proto = self.proto(allow_cache=False) + self.cached_registry_proto = self.proto() self.stop_thread = False self.refresh_cache_thread = threading.Thread(target=self._refresh_cache) self.refresh_cache_thread.daemon = True @@ -176,7 +176,7 @@ def get_entity( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}/entities/{name}" - params = {"allow_cache": False} + params = {"allow_cache": True} response_data = self._send_request("GET", url, params=params) return EntityModel.parse_obj(response_data).to_entity() except EntityNotFoundException as exception: @@ -199,7 +199,7 @@ def list_entities( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}/entities" - params = {"allow_cache": False} + params = {"allow_cache": True} response_data = self._send_request("GET", url, params=params) response_list = response_data if isinstance(response_data, list) else [] return [ @@ -256,7 +256,7 @@ def get_data_source( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}/data_sources/{name}" - params = {"allow_cache": False} + params = {"allow_cache": True} response_data = self._send_request("GET", url, params=params) if "model_type" in response_data: if response_data["model_type"] == "RequestSourceModel": @@ -286,7 +286,7 @@ def list_data_sources( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}/data_sources" - params = {"allow_cache": False} + params = {"allow_cache": True} response_data = self._send_request("GET", url, params=params) response_list = response_data if isinstance(response_data, list) else [] data_source_list = [] @@ -345,7 +345,7 @@ def get_feature_service( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}/feature_services/{name}" - params = {"allow_cache": False} + params = {"allow_cache": True} response_data = self._send_request("GET", url, params=params) return FeatureServiceModel.parse_obj(response_data).to_feature_service() except FeatureServiceNotFoundException as exception: @@ -366,7 +366,7 @@ def list_feature_services( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}/feature_services" - params = {"allow_cache": False} + params = {"allow_cache": True} response_data = self._send_request("GET", url, params=params) response_list = response_data if isinstance(response_data, list) else [] return [ @@ -428,7 +428,7 @@ def get_feature_view( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}/feature_views/{name}" - params = {"allow_cache": False} + params = {"allow_cache": True} response_data = self._send_request("GET", url, params=params) return FeatureViewModel.parse_obj(response_data).to_feature_view() except FeatureViewNotFoundException as exception: @@ -449,7 +449,7 @@ def list_feature_views( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}/feature_views" - params = {"allow_cache": False} + params = {"allow_cache": True} response_data = self._send_request("GET", url, params=params) response_list = response_data if isinstance(response_data, list) else [] return [ @@ -469,7 +469,7 @@ def get_on_demand_feature_view( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}/on_demand_feature_views/{name}" - params = {"allow_cache": False} + params = {"allow_cache": True} response_data = self._send_request("GET", url, params=params) return OnDemandFeatureViewModel.parse_obj(response_data).to_feature_view() except FeatureViewNotFoundException as exception: @@ -490,7 +490,7 @@ def list_on_demand_feature_views( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}/on_demand_feature_views" - params = {"allow_cache": False} + params = {"allow_cache": True} response_data = self._send_request("GET", url, params=params) response_list = response_data if isinstance(response_data, list) else [] return [ @@ -626,7 +626,7 @@ def list_validation_references( def proto(self, allow_cache: bool = True) -> RegistryProto: r = RegistryProto() - last_updated_timestamps = [] + # last_updated_timestamps = [] if self.project is None: projects = self._get_all_projects() else: @@ -645,7 +645,7 @@ def proto(self, allow_cache: bool = True) -> RegistryProto: (self.list_validation_references, r.validation_references), (self.list_project_metadata, r.project_metadata), ]: - objs: List[Any] = lister(project, allow_cache) # type: ignore + objs: List[Any] = lister(project) # type: ignore if objs: obj_protos = [obj.to_proto() for obj in objs] for obj_proto in obj_protos: @@ -658,10 +658,11 @@ def proto(self, allow_cache: bool = True) -> RegistryProto: # This is suuuper jank. Because of https://github.com/feast-dev/feast/issues/2783, # the registry proto only has a single infra field, which we're currently setting as the "last" project. r.infra.CopyFrom(self.get_infra(project).to_proto()) - last_updated_timestamps.append(self._get_last_updated_metadata(project)) + # last_updated_timestamps.append(self._get_last_updated_metadata(project)) - if last_updated_timestamps: - r.last_updated.FromDatetime(max(last_updated_timestamps)) + # if last_updated_timestamps: + # r.last_updated.FromDatetime(max(last_updated_timestamps)) + r.last_updated.FromDatetime(datetime.utcnow()) return r @@ -681,7 +682,7 @@ def refresh(self, project: Optional[str] = None): self.cached_registry_proto, project ) - refreshed_cache_registry_proto = self.proto(True) + refreshed_cache_registry_proto = self.proto() with self._refresh_lock: self.cached_registry_proto = refreshed_cache_registry_proto self.cached_registry_proto_created = datetime.utcnow() @@ -754,7 +755,8 @@ def list_project_metadata( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}" - response_data = self._send_request("GET", url) + params = {"allow_cache": True} + response_data = self._send_request("GET", url, params=params) return [ProjectMetadataModel.parse_obj(response_data).to_project_metadata()] except ProjectMetadataNotFoundException as exception: logger.error( diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 372a344c8a..4b64dfc3bb 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -731,7 +731,13 @@ def list_project_metadata( for row in rows: if row["metadata_key"] == FeastMetadataKeys.PROJECT_UUID.value: project_metadata.project_uuid = row["metadata_value"] - break + + if ( + row["metadata_key"] + == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value + ): + project_metadata.last_updated_timestamp = row["metadata_value"] + # TODO(adchia): Add other project metadata in a structured way return [project_metadata] return [] @@ -906,10 +912,10 @@ def get_user_metadata( def proto(self) -> RegistryProto: r = RegistryProto() - last_updated_timestamps = [] + # last_updated_timestamps = [] def process_project(project): - nonlocal r, last_updated_timestamps + nonlocal r # , last_updated_timestamps for lister, registry_proto_field in [ (self.list_entities, r.entities), (self.list_feature_views, r.feature_views), @@ -935,7 +941,9 @@ def process_project(project): # This is suuuper jank. Because of https://github.com/feast-dev/feast/issues/2783, # the registry proto only has a single infra field, which we're currently setting as the "last" project. r.infra.CopyFrom(self.get_infra(project).to_proto()) - last_updated_timestamps.append(self._get_last_updated_metadata(project)) + + # This is helping to find last updated metadata for project and its not being used anywhere so commenting this process + # last_updated_timestamps.append(self._get_last_updated_metadata(project)) if self.project is None: projects = self._get_all_projects() @@ -948,8 +956,11 @@ def process_project(project): ) as executor: # Adjust max_workers as needed executor.map(process_project, projects) - if last_updated_timestamps: - r.last_updated.FromDatetime(max(last_updated_timestamps)) + # This logic is calculating the max projects updated time. Not used anywhere. Just setting to current timestamp + # if last_updated_timestamps: + # r.last_updated.FromDatetime(max(last_updated_timestamps)) + + r.last_updated.FromDatetime(datetime.utcnow()) return r @@ -1014,6 +1025,7 @@ def _apply_object( self._set_last_updated_metadata(update_datetime, project) def create_project_if_not_exists(self, project): + new_project = False # Initialize project metadata if needed with self.engine.connect() as conn: update_datetime = datetime.utcnow() @@ -1036,6 +1048,9 @@ def create_project_if_not_exists(self, project): insert_stmt = insert(feast_metadata).values(values) conn.execute(insert_stmt) usage.set_current_project_uuid(new_project_uuid) + new_project = True + + if new_project: self._set_last_updated_metadata(update_datetime, project) def _delete_object( @@ -1198,10 +1213,25 @@ def get_all_project_metadata(self) -> List[ProjectMetadataModel]: ) return list(project_metadata_model_dict.values()) - def get_project_metadata(self, project: str) -> ProjectMetadataModel: + def get_project_metadata( + self, + project: str, + allow_cache: bool = False, + ) -> ProjectMetadataModel: """ Returns given project metdata. No supporting function in SQL Registry so implemented this here rather than using _get_last_updated_metadata and list_project_metadata. """ + + if allow_cache: + self._check_if_registry_refreshed() + project_metadata_proto = proto_registry_utils.get_project_metadata( + self.cached_registry_proto, project + ) + if project_metadata_proto is not None: + return ProjectMetadataModel.from_project_metadata( + ProjectMetadata.from_proto(project_metadata_proto) + ) + project_metadata_model: ProjectMetadataModel = ProjectMetadataModel( project_name=project ) diff --git a/sdk/python/feast/project_metadata.py b/sdk/python/feast/project_metadata.py index 829e9ff0d5..3743d7e84b 100644 --- a/sdk/python/feast/project_metadata.py +++ b/sdk/python/feast/project_metadata.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import uuid +from datetime import datetime from typing import Optional from google.protobuf.json_format import MessageToJson @@ -33,6 +34,7 @@ class ProjectMetadata: project_name: str project_uuid: str + last_updated_timestamp: datetime @log_exceptions def __init__( @@ -40,6 +42,7 @@ def __init__( *args, project_name: Optional[str] = None, project_uuid: Optional[str] = None, + last_updated_timestamp: datetime = datetime.utcfromtimestamp(1), ): """ Creates an Project metadata object. @@ -56,6 +59,7 @@ def __init__( self.project_name = project_name self.project_uuid = project_uuid or f"{uuid.uuid4()}" + self.last_updated_timestamp = last_updated_timestamp def __hash__(self) -> int: return hash((self.project_name, self.project_uuid)) @@ -69,6 +73,7 @@ def __eq__(self, other): if ( self.project_name != other.project_name or self.project_uuid != other.project_uuid + or self.last_updated_timestamp != other.last_updated_timestamp ): return False @@ -94,6 +99,7 @@ def from_proto(cls, project_metadata_proto: ProjectMetadataProto): entity = cls( project_name=project_metadata_proto.project, project_uuid=project_metadata_proto.project_uuid, + last_updated_timestamp=project_metadata_proto.last_updated_timestamp.ToDatetime(), ) return entity @@ -106,6 +112,11 @@ def to_proto(self) -> ProjectMetadataProto: An ProjectMetadataProto protobuf. """ - return ProjectMetadataProto( - project=self.project_name, project_uuid=self.project_uuid + project_metadata_proto = ProjectMetadataProto( + project=self.project_name, + project_uuid=self.project_uuid, ) + project_metadata_proto.last_updated_timestamp.FromDatetime( + self.last_updated_timestamp + ) + return project_metadata_proto diff --git a/sdk/python/tests/unit/test_pydantic_models.py b/sdk/python/tests/unit/test_pydantic_models.py index 2ceef000df..5853443cb0 100644 --- a/sdk/python/tests/unit/test_pydantic_models.py +++ b/sdk/python/tests/unit/test_pydantic_models.py @@ -542,7 +542,9 @@ def test_idempotent_feature_service_conversion(): def test_idempotent_project_metadata_conversion(): python_obj = ProjectMetadata( - project_name="test_project", project_uuid=f"{uuid.uuid4()}" + project_name="test_project", + project_uuid=f"{uuid.uuid4()}", + last_updated_timestamp=datetime.utcnow(), ) pydantic_obj = ProjectMetadataModel.from_project_metadata(python_obj) converted_python_obj = pydantic_obj.to_project_metadata() @@ -552,8 +554,8 @@ def test_idempotent_project_metadata_conversion(): python_obj_from_proto = ProjectMetadata.from_proto(feast_proto) assert python_obj == python_obj_from_proto - pydantic_json = pydantic_obj.json(exclude={"last_updated_timestamp"}) + pydantic_json = pydantic_obj.json() assert pydantic_obj == ProjectMetadataModel.parse_raw(pydantic_json) - pydantic_json = pydantic_obj.dict(exclude={"last_updated_timestamp"}) + pydantic_json = pydantic_obj.dict() assert pydantic_obj == ProjectMetadataModel.parse_obj(pydantic_json)