Skip to content

Commit

Permalink
fix: Added caching to get project calls (#50)
Browse files Browse the repository at this point in the history
* fix: Added caching to get project calls, leverage http registry cache for refresh in http.py

---------

Co-authored-by: Bhargav Dodla <[email protected]>
  • Loading branch information
EXPEbdodla and Bhargav Dodla authored Oct 5, 2023
1 parent 5e7d144 commit 81edb15
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 34 deletions.
3 changes: 3 additions & 0 deletions go/embedded/online_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/feast-dev/feast/go/protos/feast/serving"
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
"github.com/feast-dev/feast/go/types"
jsonlog "github.com/rs/zerolog/log"
"google.golang.org/grpc/health/grpc_health_v1"
)

Expand Down Expand Up @@ -65,13 +66,15 @@ type LoggingOptions struct {
func NewOnlineFeatureService(conf *OnlineFeatureServiceConfig, transformationCallback transformation.TransformationCallback) *OnlineFeatureService {
repoConfig, err := registry.NewRepoConfigFromJSON(conf.RepoPath, conf.RepoConfig)
if err != nil {
jsonlog.Error().Stack().Err(err).Msg("Failed to convert to RepoConfig")
return &OnlineFeatureService{
err: err,
}
}

fs, err := feast.NewFeatureStore(repoConfig, transformationCallback)
if err != nil {
jsonlog.Error().Stack().Err(err).Msg("Failed to create NewFeatureStore")
return &OnlineFeatureService{
err: err,
}
Expand Down
2 changes: 0 additions & 2 deletions go/internal/feast/featurestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package feast
import (
"context"
"errors"
"fmt"

"github.com/apache/arrow/go/v8/arrow/memory"

Expand Down Expand Up @@ -53,7 +52,6 @@ func NewFeatureStore(config *registry.RepoConfig, callback transformation.Transf
}
err = registry.InitializeRegistry()
if err != nil {
fmt.Println("ERROR: Unable to Initialize Registry: ", err)
return nil, err
}

Expand Down
5 changes: 3 additions & 2 deletions go/internal/feast/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/feast-dev/feast/go/internal/feast/model"
"github.com/rs/zerolog/log"

"github.com/feast-dev/feast/go/protos/feast/core"
)
Expand Down Expand Up @@ -70,7 +71,7 @@ func (r *Registry) InitializeRegistry() error {
_, err := r.getRegistryProto()
if err != nil {
if _, ok := r.registryStore.(*HttpRegistryStore); ok {
fmt.Printf("[%s] %s %s\n", time.UTC.String(), "ERROR: Registry Initialization Failed: ", err)
log.Error().Err(err).Msg("Registry Initialization Failed")
return err
}
registryProto := &core.Registry{RegistrySchemaVersion: REGISTRY_SCHEMA_VERSION}
Expand All @@ -85,7 +86,7 @@ func (r *Registry) RefreshRegistryOnInterval() {
for ; true; <-ticker.C {
err := r.refresh()
if err != nil {
fmt.Printf("[%s] %s %s\n", time.UTC.String(), "ERROR: Failed to refresh Registry: ", err)
log.Error().Stack().Err(err).Msg("Registry refresh Failed")
return
}
}
Expand Down
1 change: 1 addition & 0 deletions go/internal/feast/server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ func (s *httpServer) Serve(host string, port int) error {
if err == http.ErrServerClosed {
return nil
}
log.Error().Stack().Err(err).Msg("Startup failed")
return err
}

Expand Down
2 changes: 2 additions & 0 deletions protos/feast/core/Registry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,6 @@ message Registry {
message ProjectMetadata {
string project = 1;
string project_uuid = 2;
google.protobuf.Timestamp last_updated_timestamp = 3;

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def to_project_metadata(self) -> ProjectMetadata:
return ProjectMetadata(
project_name=self.project_name,
project_uuid=self.project_uuid,
last_updated_timestamp=self.last_updated_timestamp,
)

@classmethod
Expand All @@ -41,4 +42,5 @@ def from_project_metadata(
return cls(
project_name=project_metadata.project_name,
project_uuid=project_metadata.project_uuid,
last_updated_timestamp=project_metadata.last_updated_timestamp,
)
38 changes: 20 additions & 18 deletions sdk/python/feast/infra/registry/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def __init__(
if registry_config.cache_ttl_seconds is not None
else 0
)
self.cached_registry_proto = self.proto(allow_cache=False)
self.cached_registry_proto = self.proto()
self.stop_thread = False
self.refresh_cache_thread = threading.Thread(target=self._refresh_cache)
self.refresh_cache_thread.daemon = True
Expand Down Expand Up @@ -176,7 +176,7 @@ def get_entity( # type: ignore[return]
)
try:
url = f"{self.base_url}/projects/{project}/entities/{name}"
params = {"allow_cache": False}
params = {"allow_cache": True}
response_data = self._send_request("GET", url, params=params)
return EntityModel.parse_obj(response_data).to_entity()
except EntityNotFoundException as exception:
Expand All @@ -199,7 +199,7 @@ def list_entities( # type: ignore[return]
)
try:
url = f"{self.base_url}/projects/{project}/entities"
params = {"allow_cache": False}
params = {"allow_cache": True}
response_data = self._send_request("GET", url, params=params)
response_list = response_data if isinstance(response_data, list) else []
return [
Expand Down Expand Up @@ -256,7 +256,7 @@ def get_data_source( # type: ignore[return]
)
try:
url = f"{self.base_url}/projects/{project}/data_sources/{name}"
params = {"allow_cache": False}
params = {"allow_cache": True}
response_data = self._send_request("GET", url, params=params)
if "model_type" in response_data:
if response_data["model_type"] == "RequestSourceModel":
Expand Down Expand Up @@ -286,7 +286,7 @@ def list_data_sources( # type: ignore[return]
)
try:
url = f"{self.base_url}/projects/{project}/data_sources"
params = {"allow_cache": False}
params = {"allow_cache": True}
response_data = self._send_request("GET", url, params=params)
response_list = response_data if isinstance(response_data, list) else []
data_source_list = []
Expand Down Expand Up @@ -345,7 +345,7 @@ def get_feature_service( # type: ignore[return]
)
try:
url = f"{self.base_url}/projects/{project}/feature_services/{name}"
params = {"allow_cache": False}
params = {"allow_cache": True}
response_data = self._send_request("GET", url, params=params)
return FeatureServiceModel.parse_obj(response_data).to_feature_service()
except FeatureServiceNotFoundException as exception:
Expand All @@ -366,7 +366,7 @@ def list_feature_services( # type: ignore[return]
)
try:
url = f"{self.base_url}/projects/{project}/feature_services"
params = {"allow_cache": False}
params = {"allow_cache": True}
response_data = self._send_request("GET", url, params=params)
response_list = response_data if isinstance(response_data, list) else []
return [
Expand Down Expand Up @@ -428,7 +428,7 @@ def get_feature_view( # type: ignore[return]
)
try:
url = f"{self.base_url}/projects/{project}/feature_views/{name}"
params = {"allow_cache": False}
params = {"allow_cache": True}
response_data = self._send_request("GET", url, params=params)
return FeatureViewModel.parse_obj(response_data).to_feature_view()
except FeatureViewNotFoundException as exception:
Expand All @@ -449,7 +449,7 @@ def list_feature_views( # type: ignore[return]
)
try:
url = f"{self.base_url}/projects/{project}/feature_views"
params = {"allow_cache": False}
params = {"allow_cache": True}
response_data = self._send_request("GET", url, params=params)
response_list = response_data if isinstance(response_data, list) else []
return [
Expand All @@ -469,7 +469,7 @@ def get_on_demand_feature_view( # type: ignore[return]
)
try:
url = f"{self.base_url}/projects/{project}/on_demand_feature_views/{name}"
params = {"allow_cache": False}
params = {"allow_cache": True}
response_data = self._send_request("GET", url, params=params)
return OnDemandFeatureViewModel.parse_obj(response_data).to_feature_view()
except FeatureViewNotFoundException as exception:
Expand All @@ -490,7 +490,7 @@ def list_on_demand_feature_views( # type: ignore[return]
)
try:
url = f"{self.base_url}/projects/{project}/on_demand_feature_views"
params = {"allow_cache": False}
params = {"allow_cache": True}
response_data = self._send_request("GET", url, params=params)
response_list = response_data if isinstance(response_data, list) else []
return [
Expand Down Expand Up @@ -626,7 +626,7 @@ def list_validation_references(

def proto(self, allow_cache: bool = True) -> RegistryProto:
r = RegistryProto()
last_updated_timestamps = []
# last_updated_timestamps = []
if self.project is None:
projects = self._get_all_projects()
else:
Expand All @@ -645,7 +645,7 @@ def proto(self, allow_cache: bool = True) -> RegistryProto:
(self.list_validation_references, r.validation_references),
(self.list_project_metadata, r.project_metadata),
]:
objs: List[Any] = lister(project, allow_cache) # type: ignore
objs: List[Any] = lister(project) # type: ignore
if objs:
obj_protos = [obj.to_proto() for obj in objs]
for obj_proto in obj_protos:
Expand All @@ -658,10 +658,11 @@ def proto(self, allow_cache: bool = True) -> RegistryProto:
# This is suuuper jank. Because of https://github.com/feast-dev/feast/issues/2783,
# the registry proto only has a single infra field, which we're currently setting as the "last" project.
r.infra.CopyFrom(self.get_infra(project).to_proto())
last_updated_timestamps.append(self._get_last_updated_metadata(project))
# last_updated_timestamps.append(self._get_last_updated_metadata(project))

if last_updated_timestamps:
r.last_updated.FromDatetime(max(last_updated_timestamps))
# if last_updated_timestamps:
# r.last_updated.FromDatetime(max(last_updated_timestamps))
r.last_updated.FromDatetime(datetime.utcnow())

return r

Expand All @@ -681,7 +682,7 @@ def refresh(self, project: Optional[str] = None):
self.cached_registry_proto, project
)

refreshed_cache_registry_proto = self.proto(True)
refreshed_cache_registry_proto = self.proto()
with self._refresh_lock:
self.cached_registry_proto = refreshed_cache_registry_proto
self.cached_registry_proto_created = datetime.utcnow()
Expand Down Expand Up @@ -754,7 +755,8 @@ def list_project_metadata( # type: ignore[return]
)
try:
url = f"{self.base_url}/projects/{project}"
response_data = self._send_request("GET", url)
params = {"allow_cache": True}
response_data = self._send_request("GET", url, params=params)
return [ProjectMetadataModel.parse_obj(response_data).to_project_metadata()]
except ProjectMetadataNotFoundException as exception:
logger.error(
Expand Down
44 changes: 37 additions & 7 deletions sdk/python/feast/infra/registry/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,13 @@ def list_project_metadata(
for row in rows:
if row["metadata_key"] == FeastMetadataKeys.PROJECT_UUID.value:
project_metadata.project_uuid = row["metadata_value"]
break

if (
row["metadata_key"]
== FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value
):
project_metadata.last_updated_timestamp = row["metadata_value"]

# TODO(adchia): Add other project metadata in a structured way
return [project_metadata]
return []
Expand Down Expand Up @@ -906,10 +912,10 @@ def get_user_metadata(

def proto(self) -> RegistryProto:
r = RegistryProto()
last_updated_timestamps = []
# last_updated_timestamps = []

def process_project(project):
nonlocal r, last_updated_timestamps
nonlocal r # , last_updated_timestamps
for lister, registry_proto_field in [
(self.list_entities, r.entities),
(self.list_feature_views, r.feature_views),
Expand All @@ -935,7 +941,9 @@ def process_project(project):
# This is suuuper jank. Because of https://github.com/feast-dev/feast/issues/2783,
# the registry proto only has a single infra field, which we're currently setting as the "last" project.
r.infra.CopyFrom(self.get_infra(project).to_proto())
last_updated_timestamps.append(self._get_last_updated_metadata(project))

# This is helping to find last updated metadata for project and its not being used anywhere so commenting this process
# last_updated_timestamps.append(self._get_last_updated_metadata(project))

if self.project is None:
projects = self._get_all_projects()
Expand All @@ -948,8 +956,11 @@ def process_project(project):
) as executor: # Adjust max_workers as needed
executor.map(process_project, projects)

if last_updated_timestamps:
r.last_updated.FromDatetime(max(last_updated_timestamps))
# This logic is calculating the max projects updated time. Not used anywhere. Just setting to current timestamp
# if last_updated_timestamps:
# r.last_updated.FromDatetime(max(last_updated_timestamps))

r.last_updated.FromDatetime(datetime.utcnow())

return r

Expand Down Expand Up @@ -1014,6 +1025,7 @@ def _apply_object(
self._set_last_updated_metadata(update_datetime, project)

def create_project_if_not_exists(self, project):
new_project = False
# Initialize project metadata if needed
with self.engine.connect() as conn:
update_datetime = datetime.utcnow()
Expand All @@ -1036,6 +1048,9 @@ def create_project_if_not_exists(self, project):
insert_stmt = insert(feast_metadata).values(values)
conn.execute(insert_stmt)
usage.set_current_project_uuid(new_project_uuid)
new_project = True

if new_project:
self._set_last_updated_metadata(update_datetime, project)

def _delete_object(
Expand Down Expand Up @@ -1198,10 +1213,25 @@ def get_all_project_metadata(self) -> List[ProjectMetadataModel]:
)
return list(project_metadata_model_dict.values())

def get_project_metadata(self, project: str) -> ProjectMetadataModel:
def get_project_metadata(
self,
project: str,
allow_cache: bool = False,
) -> ProjectMetadataModel:
"""
Returns given project metdata. No supporting function in SQL Registry so implemented this here rather than using _get_last_updated_metadata and list_project_metadata.
"""

if allow_cache:
self._check_if_registry_refreshed()
project_metadata_proto = proto_registry_utils.get_project_metadata(
self.cached_registry_proto, project
)
if project_metadata_proto is not None:
return ProjectMetadataModel.from_project_metadata(
ProjectMetadata.from_proto(project_metadata_proto)
)

project_metadata_model: ProjectMetadataModel = ProjectMetadataModel(
project_name=project
)
Expand Down
Loading

0 comments on commit 81edb15

Please sign in to comment.