From 259a79f21a48a9dc79ebe2007a8deec9346fbc2b Mon Sep 17 00:00:00 2001 From: vbhagwat <[vbhagwat@expediagroup.com]> Date: Tue, 30 Jan 2024 10:08:43 -0800 Subject: [PATCH 01/20] accepting client_id header --- sdk/python/feast/infra/registry/http.py | 280 +++++++++++++----------- 1 file changed, 153 insertions(+), 127 deletions(-) diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index 2e3c5fac5c..b20828bee9 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -61,22 +61,25 @@ class HttpRegistryConfig(RegistryConfig): """ str: Endpoint of Feature registry. If registry_type is 'http', then this is a endpoint of Feature Registry """ + clint_id: StrictStr = "Unknown" + CACHE_REFRESH_THRESHOLD_SECONDS = 300 class HttpRegistry(BaseRegistry): def __init__( - self, - registry_config: Optional[Union[RegistryConfig, HttpRegistryConfig]], - project: str, - repo_path: Optional[Path], + self, + registry_config: Optional[Union[RegistryConfig, HttpRegistryConfig]], + project: str, + repo_path: Optional[Path], ): assert registry_config is not None, "HTTPRegistry needs a valid registry_config" # Timeouts in seconds timeout = httpx.Timeout(5.0, connect=60.0) transport = httpx.HTTPTransport(retries=3, verify=False) self.base_url = registry_config.path + self.client_id = registry_config.clint_id self.http_client = httpx.Client( timeout=timeout, transport=transport, @@ -120,9 +123,9 @@ def _handle_exception(self, exception: Exception): logger.exception("Request failed with exception: %s", str(exception)) raise httpx.HTTPError("Request failed with exception: " + str(exception)) - def _send_request(self, method: str, url: str, params=None, data=None): + def _send_request(self, method: str, url: str, params=None, data=None, headers= None): try: - request = httpx.Request(method=method, url=url, params=params, data=data) + request = httpx.Request(method=method, url=url, params=params, data=data, headers=headers) response = self.http_client.send(request) response.raise_for_status() return response.json() @@ -135,7 +138,8 @@ def apply_project(self, project: str, commit: bool = True) -> ProjectMetadataMod try: url = f"{self.base_url}/projects" params = {"project": project, "commit": commit} - response_data = self._send_request("PUT", url, params=params) + headers = {"client_id": self.client_id} + response_data = self._send_request("PUT", url, params=params, headers = headers) return ProjectMetadataModel.parse_obj(response_data) except Exception as exception: self._handle_exception(exception) @@ -145,7 +149,8 @@ def apply_entity(self, entity: Entity, project: str, commit: bool = True): url = f"{self.base_url}/projects/{project}/entities" data = EntityModel.from_entity(entity).json() params = {"commit": commit} - response_data = self._send_request("PUT", url, params=params, data=data) + headers = {"client_id": self.client_id} + response_data = self._send_request("PUT", url, params=params, data=data, headers=headers) return EntityModel.parse_obj(response_data).to_entity() except Exception as exception: self._handle_exception(exception) @@ -154,7 +159,8 @@ def delete_entity(self, name: str, project: str, commit: bool = True): try: url = f"{self.base_url}/projects/{project}/entities/{name}" params = {"commit": commit} - self._send_request("DELETE", url, params=params) + headers = {"client_id": self.client_id} + self._send_request("DELETE", url, params=params, headers= headers) logger.info(f"Deleted Entity {name} from project {project}") except EntityNotFoundException as exception: logger.error( @@ -165,10 +171,10 @@ def delete_entity(self, name: str, project: str, commit: bool = True): self._handle_exception(exception) def get_entity( # type: ignore[return] - self, - name: str, - project: str, - allow_cache: bool = False, + self, + name: str, + project: str, + allow_cache: bool = False, ) -> Entity: if allow_cache: self._check_if_registry_refreshed() @@ -178,7 +184,8 @@ def get_entity( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/entities/{name}" params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) + headers = {"client_id": self.client_id} + response_data = self._send_request("GET", url, params=params, headers=headers) return EntityModel.parse_obj(response_data).to_entity() except EntityNotFoundException as exception: logger.error( @@ -189,9 +196,9 @@ def get_entity( # type: ignore[return] self._handle_exception(exception) def list_entities( # type: ignore[return] - self, - project: str, - allow_cache: bool = False, + self, + project: str, + allow_cache: bool = False, ) -> List[Entity]: if allow_cache: self._check_if_registry_refreshed() @@ -201,7 +208,8 @@ def list_entities( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/entities" params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) + headers = {"client_id": self.client_id} + response_data = self._send_request("GET", url, params=params, headers=headers) response_list = response_data if isinstance(response_data, list) else [] return [ EntityModel.parse_obj(entity).to_entity() for entity in response_list @@ -210,14 +218,15 @@ def list_entities( # type: ignore[return] self._handle_exception(exception) def apply_data_source( - self, data_source: DataSource, project: str, commit: bool = True + self, data_source: DataSource, project: str, commit: bool = True ): try: url = f"{self.base_url}/projects/{project}/data_sources" params = {"commit": commit} + headers = {"client_id": self.client_id} if isinstance(data_source, SparkSource): data = SparkSourceModel.from_data_source(data_source).json() - response_data = self._send_request("PUT", url, params=params, data=data) + response_data = self._send_request("PUT", url, params=params, data=data, headers=headers) return SparkSourceModel.parse_obj(response_data).to_data_source() elif isinstance(data_source, RequestSource): data = RequestSourceModel.from_data_source(data_source).json() @@ -238,7 +247,8 @@ def delete_data_source(self, name: str, project: str, commit: bool = True): try: url = f"{self.base_url}/projects/{project}/data_sources/{name}" params = {"commit": commit} - self._send_request("DELETE", url, params=params) + headers = {"client_id": self.client_id} + self._send_request("DELETE", url, params=params, headers = headers) logger.info(f"Deleted Datasource {name} from project {project}") except DataSourceObjectNotFoundException as exception: logger.error( @@ -249,10 +259,10 @@ def delete_data_source(self, name: str, project: str, commit: bool = True): self._handle_exception(exception) def get_data_source( # type: ignore[return] - self, - name: str, - project: str, - allow_cache: bool = False, + self, + name: str, + project: str, + allow_cache: bool = False, ) -> DataSource: if allow_cache: self._check_if_registry_refreshed() @@ -262,7 +272,8 @@ def get_data_source( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/data_sources/{name}" params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) + headers = {"client_id": self.client_id} + response_data = self._send_request("GET", url, params=params, headers =headers) if "model_type" in response_data: if response_data["model_type"] == "RequestSourceModel": return RequestSourceModel.parse_obj(response_data).to_data_source() @@ -280,9 +291,9 @@ def get_data_source( # type: ignore[return] self._handle_exception(exception) def list_data_sources( # type: ignore[return] - self, - project: str, - allow_cache: bool = False, + self, + project: str, + allow_cache: bool = False, ) -> List[DataSource]: if allow_cache: self._check_if_registry_refreshed() @@ -292,7 +303,8 @@ def list_data_sources( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/data_sources" params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) + headers = {"client_id": self.client_id} + response_data = self._send_request("GET", url, params=params, headers=headers) response_list = response_data if isinstance(response_data, list) else [] data_source_list = [] for data_source in response_list: @@ -311,13 +323,14 @@ def list_data_sources( # type: ignore[return] self._handle_exception(exception) def apply_feature_service( - self, feature_service: FeatureService, project: str, commit: bool = True + self, feature_service: FeatureService, project: str, commit: bool = True ): try: url = f"{self.base_url}/projects/{project}/feature_services" data = FeatureServiceModel.from_feature_service(feature_service).json() params = {"commit": commit} - response_data = self._send_request("PUT", url, params=params, data=data) + headers = {"client_id": self.client_id} + response_data = self._send_request("PUT", url, params=params, data=data, headers=headers) return FeatureServiceModel.parse_obj(response_data).to_feature_service() except Exception as exception: self._handle_exception(exception) @@ -326,7 +339,8 @@ def delete_feature_service(self, name: str, project: str, commit: bool = True): try: url = f"{self.base_url}/projects/{project}/feature_services/{name}" params = {"commit": commit} - self._send_request("DELETE", url, params=params) + headers = {"client_id": self.client_id} + self._send_request("DELETE", url, params=params, headers = headers) logger.info(f"Deleted FeatureService {name} from project {project}") except FeatureServiceNotFoundException as exception: logger.error( @@ -338,10 +352,10 @@ def delete_feature_service(self, name: str, project: str, commit: bool = True): self._handle_exception(exception) def get_feature_service( # type: ignore[return] - self, - name: str, - project: str, - allow_cache: bool = False, + self, + name: str, + project: str, + allow_cache: bool = False, ) -> FeatureService: if allow_cache: self._check_if_registry_refreshed() @@ -351,7 +365,8 @@ def get_feature_service( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/feature_services/{name}" params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) + headers = {"client_id": self.client_id} + response_data = self._send_request("GET", url, params=params, headers= headers) return FeatureServiceModel.parse_obj(response_data).to_feature_service() except FeatureServiceNotFoundException as exception: logger.error( @@ -362,7 +377,7 @@ def get_feature_service( # type: ignore[return] self._handle_exception(exception) def list_feature_services( # type: ignore[return] - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[FeatureService]: if allow_cache: self._check_if_registry_refreshed() @@ -372,7 +387,8 @@ def list_feature_services( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/feature_services" params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) + headers = {"client_id": self.client_id} + response_data = self._send_request("GET", url, params=params, headers=headers) response_list = response_data if isinstance(response_data, list) else [] return [ FeatureServiceModel.parse_obj(feature_service).to_feature_service() @@ -382,19 +398,20 @@ def list_feature_services( # type: ignore[return] self._handle_exception(exception) def apply_feature_view( - self, feature_view: BaseFeatureView, project: str, commit: bool = True + self, feature_view: BaseFeatureView, project: str, commit: bool = True ): try: params = {"commit": commit} + headers = {"client_id": self.client_id} if isinstance(feature_view, FeatureView): url = f"{self.base_url}/projects/{project}/feature_views" data = FeatureViewModel.from_feature_view(feature_view).json() - response_data = self._send_request("PUT", url, params=params, data=data) + response_data = self._send_request("PUT", url, params=params, data=data, headers=headers) return FeatureViewModel.parse_obj(response_data).to_feature_view() elif isinstance(feature_view, OnDemandFeatureView): url = f"{self.base_url}/projects/{project}/on_demand_feature_views" data = OnDemandFeatureViewModel.from_feature_view(feature_view).json() - response_data = self._send_request("PUT", url, params=params, data=data) + response_data = self._send_request("PUT", url, params=params, data=data, headers=headers) return OnDemandFeatureViewModel.parse_obj( response_data ).to_feature_view() @@ -409,7 +426,8 @@ def delete_feature_view(self, name: str, project: str, commit: bool = True): try: url = f"{self.base_url}/projects/{project}/feature_views/{name}" params = {"commit": commit} - self._send_request("DELETE", url, params=params) + headers = {"client_id": self.client_id} + self._send_request("DELETE", url, params=params, headers=headers) logger.info(f"Deleted FeatureView {name} from project {project}") except FeatureViewNotFoundException as exception: logger.error( @@ -421,10 +439,10 @@ def delete_feature_view(self, name: str, project: str, commit: bool = True): self._handle_exception(exception) def get_feature_view( # type: ignore[return] - self, - name: str, - project: str, - allow_cache: bool = False, + self, + name: str, + project: str, + allow_cache: bool = False, ) -> FeatureView: if allow_cache: self._check_if_registry_refreshed() @@ -434,7 +452,8 @@ def get_feature_view( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/feature_views/{name}" params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) + headers = {"client_id": self.client_id} + response_data = self._send_request("GET", url, params=params, headers=headers) return FeatureViewModel.parse_obj(response_data).to_feature_view() except FeatureViewNotFoundException as exception: logger.error( @@ -445,7 +464,7 @@ def get_feature_view( # type: ignore[return] self._handle_exception(exception) def list_feature_views( # type: ignore[return] - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[FeatureView]: if allow_cache: self._check_if_registry_refreshed() @@ -455,7 +474,8 @@ def list_feature_views( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/feature_views" params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) + headers = {"client_id": self.client_id} + response_data = self._send_request("GET", url, params=params, headers=headers) response_list = response_data if isinstance(response_data, list) else [] return [ FeatureViewModel.parse_obj(feature_view).to_feature_view() @@ -465,7 +485,7 @@ def list_feature_views( # type: ignore[return] self._handle_exception(exception) def get_on_demand_feature_view( # type: ignore[return] - self, name: str, project: str, allow_cache: bool = False + self, name: str, project: str, allow_cache: bool = False ) -> OnDemandFeatureView: if allow_cache: self._check_if_registry_refreshed() @@ -475,7 +495,8 @@ def get_on_demand_feature_view( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/on_demand_feature_views/{name}" params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) + headers = {"client_id": self.client_id} + response_data = self._send_request("GET", url, params=params, headers=headers) return OnDemandFeatureViewModel.parse_obj(response_data).to_feature_view() except FeatureViewNotFoundException as exception: logger.error( @@ -486,7 +507,7 @@ def get_on_demand_feature_view( # type: ignore[return] self._handle_exception(exception) def list_on_demand_feature_views( # type: ignore[return] - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[OnDemandFeatureView]: if allow_cache: self._check_if_registry_refreshed() @@ -496,7 +517,8 @@ def list_on_demand_feature_views( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/on_demand_feature_views" params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) + headers = {"client_id": self.client_id} + response_data = self._send_request("GET", url, params=params, headers=headers) response_list = response_data if isinstance(response_data, list) else [] return [ OnDemandFeatureViewModel.parse_obj(feature_view).to_feature_view() @@ -506,51 +528,52 @@ def list_on_demand_feature_views( # type: ignore[return] self._handle_exception(exception) def get_stream_feature_view( - self, - name: str, - project: str, - allow_cache: bool = False, + self, + name: str, + project: str, + allow_cache: bool = False, ): raise NotImplementedError("Method not implemented") def list_stream_feature_views( - self, - project: str, - allow_cache: bool = False, + self, + project: str, + allow_cache: bool = False, ) -> List[StreamFeatureView]: # TODO: Implement listing Stream Feature Views return [] def get_request_feature_view( - self, - name: str, - project: str, + self, + name: str, + project: str, ) -> RequestFeatureView: raise NotImplementedError("Method not implemented") def list_request_feature_views( - self, - project: str, - allow_cache: bool = False, + self, + project: str, + allow_cache: bool = False, ) -> List[RequestFeatureView]: # TODO: Implement listing Request Feature Views return [] def apply_materialization( - self, - feature_view: FeatureView, - project: str, - start_date: datetime, - end_date: datetime, - commit: bool = True, + self, + feature_view: FeatureView, + project: str, + start_date: datetime, + end_date: datetime, + commit: bool = True, ): try: if isinstance(feature_view, FeatureView): feature_view.materialization_intervals.append((start_date, end_date)) params = {"commit": commit} + headers = {client_id: self.client_id} url = f"{self.base_url}/projects/{project}/feature_views" data = FeatureViewModel.from_feature_view(feature_view).json() - response_data = self._send_request("PUT", url, params=params, data=data) + response_data = self._send_request("PUT", url, params=params, data=data, headers=headers) return FeatureViewModel.parse_obj(response_data).to_feature_view() else: raise TypeError( @@ -560,30 +583,30 @@ def apply_materialization( self._handle_exception(exception) def apply_saved_dataset( - self, saved_dataset: SavedDataset, project: str, commit: bool = True + self, saved_dataset: SavedDataset, project: str, commit: bool = True ): raise NotImplementedError("Method not implemented") def get_saved_dataset( - self, - name: str, - project: str, - allow_cache: bool = False, + self, + name: str, + project: str, + allow_cache: bool = False, ) -> SavedDataset: raise NotImplementedError("Method not implemented") def list_saved_datasets( - self, - project: str, - allow_cache: bool = False, + self, + project: str, + allow_cache: bool = False, ) -> List[SavedDataset]: pass def apply_validation_reference( - self, - validation_reference: ValidationReference, - project: str, - commit: bool = True, + self, + validation_reference: ValidationReference, + project: str, + commit: bool = True, ): raise NotImplementedError("Method not implemented") @@ -591,10 +614,10 @@ def delete_validation_reference(self, name: str, project: str, commit: bool = Tr raise NotImplementedError("Method not implemented") def get_validation_reference( - self, - name: str, - project: str, - allow_cache: bool = False, + self, + name: str, + project: str, + allow_cache: bool = False, ) -> ValidationReference: raise NotImplementedError("Method not implemented") @@ -602,30 +625,30 @@ def update_infra(self, infra: Infra, project: str, commit: bool = True): raise NotImplementedError("Method not implemented") def get_infra( - self, - project: str, - allow_cache: bool = False, + self, + project: str, + allow_cache: bool = False, ) -> Infra: # TODO: Need to implement this when necessary return Infra() def apply_user_metadata( - self, - project: str, - feature_view: BaseFeatureView, - metadata_bytes: Optional[bytes], + self, + project: str, + feature_view: BaseFeatureView, + metadata_bytes: Optional[bytes], ): raise NotImplementedError("Method not implemented") def get_user_metadata( - self, project: str, feature_view: BaseFeatureView + self, project: str, feature_view: BaseFeatureView ) -> Optional[bytes]: raise NotImplementedError("Method not implemented") def list_validation_references( - self, - project: str, - allow_cache: bool = False, + self, + project: str, + allow_cache: bool = False, ) -> List[ValidationReference]: pass @@ -695,19 +718,19 @@ def refresh(self, project: Optional[str] = None): def _refresh_cached_registry_if_necessary(self): with self._refresh_lock: expired = ( - self.cached_registry_proto is None - or self.cached_registry_proto_created is None - ) or ( - self.cached_registry_proto_ttl.total_seconds() - > 0 # 0 ttl means infinity - and ( - datetime.utcnow() - > ( - self.cached_registry_proto_created - + self.cached_registry_proto_ttl - ) - ) - ) + self.cached_registry_proto is None + or self.cached_registry_proto_created is None + ) or ( + self.cached_registry_proto_ttl.total_seconds() + > 0 # 0 ttl means infinity + and ( + datetime.utcnow() + > ( + self.cached_registry_proto_created + + self.cached_registry_proto_ttl + ) + ) + ) if expired: logger.info("Registry cache expired, so refreshing") @@ -715,17 +738,17 @@ def _refresh_cached_registry_if_necessary(self): def _check_if_registry_refreshed(self): if ( - self.cached_registry_proto is None - or self.cached_registry_proto_created is None + self.cached_registry_proto is None + or self.cached_registry_proto_created is None ) or ( - self.cached_registry_proto_ttl.total_seconds() > 0 # 0 ttl means infinity - and ( - datetime.utcnow() - > (self.cached_registry_proto_created + self.cached_registry_proto_ttl) - ) + self.cached_registry_proto_ttl.total_seconds() > 0 # 0 ttl means infinity + and ( + datetime.utcnow() + > (self.cached_registry_proto_created + self.cached_registry_proto_ttl) + ) ): seconds_since_last_refresh = ( - datetime.utcnow() - self.cached_registry_proto_created + datetime.utcnow() - self.cached_registry_proto_created ).total_seconds() if seconds_since_last_refresh > CACHE_REFRESH_THRESHOLD_SECONDS: logger.warning( @@ -735,7 +758,8 @@ def _check_if_registry_refreshed(self): def _get_all_projects(self) -> Set[str]: # type: ignore[return] try: url = f"{self.base_url}/projects" - projects = self._send_request("GET", url) + headers = {"client_id": self.client_id} + projects = self._send_request("GET", url, headers=headers) return {project["project_name"] for project in projects} except Exception as exception: self._handle_exception(exception) @@ -743,7 +767,8 @@ def _get_all_projects(self) -> Set[str]: # type: ignore[return] def _get_last_updated_metadata(self, project: str): try: url = f"{self.base_url}/projects/{project}" - response_data = self._send_request("GET", url) + headers = {"client_id": self.client_id} + response_data = self._send_request("GET", url, headers=headers) return datetime.strptime( response_data["last_updated_timestamp"], "%Y-%m-%dT%H:%M:%S" ) @@ -751,7 +776,7 @@ def _get_last_updated_metadata(self, project: str): self._handle_exception(exception) def list_project_metadata( # type: ignore[return] - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[ProjectMetadata]: if allow_cache: self._check_if_registry_refreshed() @@ -761,7 +786,8 @@ def list_project_metadata( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}" params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) + headers = {"client_id": self.client_id} + response_data = self._send_request("GET", url, params=params, headers=headers) return [ProjectMetadataModel.parse_obj(response_data).to_project_metadata()] except ProjectMetadataNotFoundException as exception: logger.error( From 10cbf2287946a0f23dd09ca0918f6240a55e3ab4 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Wed, 31 Jan 2024 08:48:44 -0800 Subject: [PATCH 02/20] made client_id optional --- sdk/python/feast/infra/registry/http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index b20828bee9..dc377df946 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -61,7 +61,7 @@ class HttpRegistryConfig(RegistryConfig): """ str: Endpoint of Feature registry. If registry_type is 'http', then this is a endpoint of Feature Registry """ - clint_id: StrictStr = "Unknown" + clint_id: Optional[StrictStr] = "Unknown" CACHE_REFRESH_THRESHOLD_SECONDS = 300 From e67f988f7f7bffa3d2593f86189601f4dcea60ae Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Wed, 31 Jan 2024 10:50:00 -0800 Subject: [PATCH 03/20] added client_id to registry_config and fixed a syntax error --- sdk/python/feast/infra/registry/http.py | 2 +- sdk/python/feast/repo_config.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index dc377df946..a9aa30f0cc 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -570,7 +570,7 @@ def apply_materialization( if isinstance(feature_view, FeatureView): feature_view.materialization_intervals.append((start_date, end_date)) params = {"commit": commit} - headers = {client_id: self.client_id} + headers = {"client_id": self.client_id} url = f"{self.base_url}/projects/{project}/feature_views" data = FeatureViewModel.from_feature_view(feature_view).json() response_data = self._send_request("PUT", url, params=params, data=data, headers=headers) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 1278752574..bd6737f781 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -131,6 +131,7 @@ class RegistryConfig(FeastBaseModel): s3_additional_kwargs: Optional[Dict[str, str]] """ Dict[str, str]: Extra arguments to pass to boto3 when writing the registry file to S3. """ + clint_id: Optional[StrictStr] = "Unknown" class RepoConfig(FeastBaseModel): From ffad62d53cca89d6680a4dd8e99d0b4c14b2b9e2 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Wed, 31 Jan 2024 11:18:26 -0800 Subject: [PATCH 04/20] fixed linting errors --- sdk/python/feast/infra/registry/http.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index a9aa30f0cc..23a9ab4a36 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -123,7 +123,7 @@ def _handle_exception(self, exception: Exception): logger.exception("Request failed with exception: %s", str(exception)) raise httpx.HTTPError("Request failed with exception: " + str(exception)) - def _send_request(self, method: str, url: str, params=None, data=None, headers= None): + def _send_request(self, method: str, url: str, params=None, data=None, headers=None): try: request = httpx.Request(method=method, url=url, params=params, data=data, headers=headers) response = self.http_client.send(request) @@ -139,7 +139,7 @@ def apply_project(self, project: str, commit: bool = True) -> ProjectMetadataMod url = f"{self.base_url}/projects" params = {"project": project, "commit": commit} headers = {"client_id": self.client_id} - response_data = self._send_request("PUT", url, params=params, headers = headers) + response_data = self._send_request("PUT", url, params=params, headers=headers) return ProjectMetadataModel.parse_obj(response_data) except Exception as exception: self._handle_exception(exception) @@ -160,7 +160,7 @@ def delete_entity(self, name: str, project: str, commit: bool = True): url = f"{self.base_url}/projects/{project}/entities/{name}" params = {"commit": commit} headers = {"client_id": self.client_id} - self._send_request("DELETE", url, params=params, headers= headers) + self._send_request("DELETE", url, params=params, headers=headers) logger.info(f"Deleted Entity {name} from project {project}") except EntityNotFoundException as exception: logger.error( @@ -248,7 +248,7 @@ def delete_data_source(self, name: str, project: str, commit: bool = True): url = f"{self.base_url}/projects/{project}/data_sources/{name}" params = {"commit": commit} headers = {"client_id": self.client_id} - self._send_request("DELETE", url, params=params, headers = headers) + self._send_request("DELETE", url, params=params, headers=headers) logger.info(f"Deleted Datasource {name} from project {project}") except DataSourceObjectNotFoundException as exception: logger.error( @@ -273,7 +273,7 @@ def get_data_source( # type: ignore[return] url = f"{self.base_url}/projects/{project}/data_sources/{name}" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers =headers) + response_data = self._send_request("GET", url, params=params, headers=headers) if "model_type" in response_data: if response_data["model_type"] == "RequestSourceModel": return RequestSourceModel.parse_obj(response_data).to_data_source() @@ -340,7 +340,7 @@ def delete_feature_service(self, name: str, project: str, commit: bool = True): url = f"{self.base_url}/projects/{project}/feature_services/{name}" params = {"commit": commit} headers = {"client_id": self.client_id} - self._send_request("DELETE", url, params=params, headers = headers) + self._send_request("DELETE", url, params=params, headers=headers) logger.info(f"Deleted FeatureService {name} from project {project}") except FeatureServiceNotFoundException as exception: logger.error( @@ -366,7 +366,7 @@ def get_feature_service( # type: ignore[return] url = f"{self.base_url}/projects/{project}/feature_services/{name}" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers= headers) + response_data = self._send_request("GET", url, params=params, headers=headers) return FeatureServiceModel.parse_obj(response_data).to_feature_service() except FeatureServiceNotFoundException as exception: logger.error( @@ -721,14 +721,10 @@ def _refresh_cached_registry_if_necessary(self): self.cached_registry_proto is None or self.cached_registry_proto_created is None ) or ( - self.cached_registry_proto_ttl.total_seconds() - > 0 # 0 ttl means infinity + self.cached_registry_proto_ttl.total_seconds() > 0 # 0 ttl means infinity and ( datetime.utcnow() - > ( - self.cached_registry_proto_created - + self.cached_registry_proto_ttl - ) + > (self.cached_registry_proto_created + self.cached_registry_proto_ttl) ) ) From abb86bf256a1764daa51daaf5b3bd75cc731644b Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Wed, 31 Jan 2024 12:07:45 -0800 Subject: [PATCH 05/20] fixed lint errors --- sdk/python/feast/infra/registry/http.py | 38 ++++++++++++++----------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index 23a9ab4a36..02bc2527bb 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -718,15 +718,19 @@ def refresh(self, project: Optional[str] = None): def _refresh_cached_registry_if_necessary(self): with self._refresh_lock: expired = ( - self.cached_registry_proto is None - or self.cached_registry_proto_created is None - ) or ( - self.cached_registry_proto_ttl.total_seconds() > 0 # 0 ttl means infinity - and ( - datetime.utcnow() - > (self.cached_registry_proto_created + self.cached_registry_proto_ttl) - ) - ) + self.cached_registry_proto is None + or self.cached_registry_proto_created is None + ) or ( + self.cached_registry_proto_ttl.total_seconds() + > 0 # 0 ttl means infinity + and ( + datetime.utcnow() + > ( + self.cached_registry_proto_created + + self.cached_registry_proto_ttl + ) + ) + ) if expired: logger.info("Registry cache expired, so refreshing") @@ -734,17 +738,17 @@ def _refresh_cached_registry_if_necessary(self): def _check_if_registry_refreshed(self): if ( - self.cached_registry_proto is None - or self.cached_registry_proto_created is None + self.cached_registry_proto is None + or self.cached_registry_proto_created is None ) or ( - self.cached_registry_proto_ttl.total_seconds() > 0 # 0 ttl means infinity - and ( - datetime.utcnow() - > (self.cached_registry_proto_created + self.cached_registry_proto_ttl) - ) + self.cached_registry_proto_ttl.total_seconds() > 0 # 0 ttl means infinity + and ( + datetime.utcnow() + > (self.cached_registry_proto_created + self.cached_registry_proto_ttl) + ) ): seconds_since_last_refresh = ( - datetime.utcnow() - self.cached_registry_proto_created + datetime.utcnow() - self.cached_registry_proto_created ).total_seconds() if seconds_since_last_refresh > CACHE_REFRESH_THRESHOLD_SECONDS: logger.warning( From a6650fbf26697886d6ef6fd74a20ee632df49284 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Thu, 1 Feb 2024 10:23:12 -0800 Subject: [PATCH 06/20] fixed lint error --- sdk/python/feast/infra/registry/http.py | 154 ++++++++++++------------ 1 file changed, 77 insertions(+), 77 deletions(-) diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index 02bc2527bb..8f6213f0b4 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -69,10 +69,10 @@ class HttpRegistryConfig(RegistryConfig): class HttpRegistry(BaseRegistry): def __init__( - self, - registry_config: Optional[Union[RegistryConfig, HttpRegistryConfig]], - project: str, - repo_path: Optional[Path], + self, + registry_config: Optional[Union[RegistryConfig, HttpRegistryConfig]], + project: str, + repo_path: Optional[Path], ): assert registry_config is not None, "HTTPRegistry needs a valid registry_config" # Timeouts in seconds @@ -171,10 +171,10 @@ def delete_entity(self, name: str, project: str, commit: bool = True): self._handle_exception(exception) def get_entity( # type: ignore[return] - self, - name: str, - project: str, - allow_cache: bool = False, + self, + name: str, + project: str, + allow_cache: bool = False, ) -> Entity: if allow_cache: self._check_if_registry_refreshed() @@ -196,9 +196,9 @@ def get_entity( # type: ignore[return] self._handle_exception(exception) def list_entities( # type: ignore[return] - self, - project: str, - allow_cache: bool = False, + self, + project: str, + allow_cache: bool = False, ) -> List[Entity]: if allow_cache: self._check_if_registry_refreshed() @@ -218,7 +218,7 @@ def list_entities( # type: ignore[return] self._handle_exception(exception) def apply_data_source( - self, data_source: DataSource, project: str, commit: bool = True + self, data_source: DataSource, project: str, commit: bool = True ): try: url = f"{self.base_url}/projects/{project}/data_sources" @@ -259,10 +259,10 @@ def delete_data_source(self, name: str, project: str, commit: bool = True): self._handle_exception(exception) def get_data_source( # type: ignore[return] - self, - name: str, - project: str, - allow_cache: bool = False, + self, + name: str, + project: str, + allow_cache: bool = False, ) -> DataSource: if allow_cache: self._check_if_registry_refreshed() @@ -291,9 +291,9 @@ def get_data_source( # type: ignore[return] self._handle_exception(exception) def list_data_sources( # type: ignore[return] - self, - project: str, - allow_cache: bool = False, + self, + project: str, + allow_cache: bool = False, ) -> List[DataSource]: if allow_cache: self._check_if_registry_refreshed() @@ -323,7 +323,7 @@ def list_data_sources( # type: ignore[return] self._handle_exception(exception) def apply_feature_service( - self, feature_service: FeatureService, project: str, commit: bool = True + self, feature_service: FeatureService, project: str, commit: bool = True ): try: url = f"{self.base_url}/projects/{project}/feature_services" @@ -352,10 +352,10 @@ def delete_feature_service(self, name: str, project: str, commit: bool = True): self._handle_exception(exception) def get_feature_service( # type: ignore[return] - self, - name: str, - project: str, - allow_cache: bool = False, + self, + name: str, + project: str, + allow_cache: bool = False, ) -> FeatureService: if allow_cache: self._check_if_registry_refreshed() @@ -377,7 +377,7 @@ def get_feature_service( # type: ignore[return] self._handle_exception(exception) def list_feature_services( # type: ignore[return] - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[FeatureService]: if allow_cache: self._check_if_registry_refreshed() @@ -398,7 +398,7 @@ def list_feature_services( # type: ignore[return] self._handle_exception(exception) def apply_feature_view( - self, feature_view: BaseFeatureView, project: str, commit: bool = True + self, feature_view: BaseFeatureView, project: str, commit: bool = True ): try: params = {"commit": commit} @@ -439,10 +439,10 @@ def delete_feature_view(self, name: str, project: str, commit: bool = True): self._handle_exception(exception) def get_feature_view( # type: ignore[return] - self, - name: str, - project: str, - allow_cache: bool = False, + self, + name: str, + project: str, + allow_cache: bool = False, ) -> FeatureView: if allow_cache: self._check_if_registry_refreshed() @@ -464,7 +464,7 @@ def get_feature_view( # type: ignore[return] self._handle_exception(exception) def list_feature_views( # type: ignore[return] - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[FeatureView]: if allow_cache: self._check_if_registry_refreshed() @@ -485,7 +485,7 @@ def list_feature_views( # type: ignore[return] self._handle_exception(exception) def get_on_demand_feature_view( # type: ignore[return] - self, name: str, project: str, allow_cache: bool = False + self, name: str, project: str, allow_cache: bool = False ) -> OnDemandFeatureView: if allow_cache: self._check_if_registry_refreshed() @@ -507,7 +507,7 @@ def get_on_demand_feature_view( # type: ignore[return] self._handle_exception(exception) def list_on_demand_feature_views( # type: ignore[return] - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[OnDemandFeatureView]: if allow_cache: self._check_if_registry_refreshed() @@ -528,25 +528,25 @@ def list_on_demand_feature_views( # type: ignore[return] self._handle_exception(exception) def get_stream_feature_view( - self, - name: str, - project: str, - allow_cache: bool = False, + self, + name: str, + project: str, + allow_cache: bool = False, ): raise NotImplementedError("Method not implemented") def list_stream_feature_views( - self, - project: str, - allow_cache: bool = False, + self, + project: str, + allow_cache: bool = False, ) -> List[StreamFeatureView]: # TODO: Implement listing Stream Feature Views return [] def get_request_feature_view( - self, - name: str, - project: str, + self, + name: str, + project: str, ) -> RequestFeatureView: raise NotImplementedError("Method not implemented") @@ -559,12 +559,12 @@ def list_request_feature_views( return [] def apply_materialization( - self, - feature_view: FeatureView, - project: str, - start_date: datetime, - end_date: datetime, - commit: bool = True, + self, + feature_view: FeatureView, + project: str, + start_date: datetime, + end_date: datetime, + commit: bool = True, ): try: if isinstance(feature_view, FeatureView): @@ -583,30 +583,30 @@ def apply_materialization( self._handle_exception(exception) def apply_saved_dataset( - self, saved_dataset: SavedDataset, project: str, commit: bool = True + self, saved_dataset: SavedDataset, project: str, commit: bool = True ): raise NotImplementedError("Method not implemented") def get_saved_dataset( - self, - name: str, - project: str, - allow_cache: bool = False, + self, + name: str, + project: str, + allow_cache: bool = False, ) -> SavedDataset: raise NotImplementedError("Method not implemented") def list_saved_datasets( - self, - project: str, - allow_cache: bool = False, + self, + project: str, + allow_cache: bool = False, ) -> List[SavedDataset]: pass def apply_validation_reference( - self, - validation_reference: ValidationReference, - project: str, - commit: bool = True, + self, + validation_reference: ValidationReference, + project: str, + commit: bool = True, ): raise NotImplementedError("Method not implemented") @@ -614,10 +614,10 @@ def delete_validation_reference(self, name: str, project: str, commit: bool = Tr raise NotImplementedError("Method not implemented") def get_validation_reference( - self, - name: str, - project: str, - allow_cache: bool = False, + self, + name: str, + project: str, + allow_cache: bool = False, ) -> ValidationReference: raise NotImplementedError("Method not implemented") @@ -625,30 +625,30 @@ def update_infra(self, infra: Infra, project: str, commit: bool = True): raise NotImplementedError("Method not implemented") def get_infra( - self, - project: str, - allow_cache: bool = False, + self, + project: str, + allow_cache: bool = False, ) -> Infra: # TODO: Need to implement this when necessary return Infra() def apply_user_metadata( - self, - project: str, - feature_view: BaseFeatureView, - metadata_bytes: Optional[bytes], + self, + project: str, + feature_view: BaseFeatureView, + metadata_bytes: Optional[bytes], ): raise NotImplementedError("Method not implemented") def get_user_metadata( - self, project: str, feature_view: BaseFeatureView + self, project: str, feature_view: BaseFeatureView ) -> Optional[bytes]: raise NotImplementedError("Method not implemented") def list_validation_references( - self, - project: str, - allow_cache: bool = False, + self, + project: str, + allow_cache: bool = False, ) -> List[ValidationReference]: pass @@ -776,7 +776,7 @@ def _get_last_updated_metadata(self, project: str): self._handle_exception(exception) def list_project_metadata( # type: ignore[return] - self, project: str, allow_cache: bool = False + self, project: str, allow_cache: bool = False ) -> List[ProjectMetadata]: if allow_cache: self._check_if_registry_refreshed() From 062064405b9c5658bce4a99ab9da565ba84504f0 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Thu, 1 Feb 2024 10:24:44 -0800 Subject: [PATCH 07/20] fixed lint error --- sdk/python/feast/infra/registry/http.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index 8f6213f0b4..e50a0fabac 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -551,9 +551,9 @@ def get_request_feature_view( raise NotImplementedError("Method not implemented") def list_request_feature_views( - self, - project: str, - allow_cache: bool = False, + self, + project: str, + allow_cache: bool = False, ) -> List[RequestFeatureView]: # TODO: Implement listing Request Feature Views return [] From f1a82c28d36868d52bb45423c8a97d4969b1d73e Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Thu, 1 Feb 2024 11:56:47 -0800 Subject: [PATCH 08/20] fix lint error --- sdk/python/feast/infra/registry/http.py | 80 ++++++++++++++++++------- 1 file changed, 60 insertions(+), 20 deletions(-) diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index e50a0fabac..abaf9c4afd 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -123,9 +123,13 @@ def _handle_exception(self, exception: Exception): logger.exception("Request failed with exception: %s", str(exception)) raise httpx.HTTPError("Request failed with exception: " + str(exception)) - def _send_request(self, method: str, url: str, params=None, data=None, headers=None): + def _send_request( + self, method: str, url: str, params=None, data=None, headers=None + ): try: - request = httpx.Request(method=method, url=url, params=params, data=data, headers=headers) + request = httpx.Request( + method=method, url=url, params=params, data=data, headers=headers + ) response = self.http_client.send(request) response.raise_for_status() return response.json() @@ -139,7 +143,9 @@ def apply_project(self, project: str, commit: bool = True) -> ProjectMetadataMod url = f"{self.base_url}/projects" params = {"project": project, "commit": commit} headers = {"client_id": self.client_id} - response_data = self._send_request("PUT", url, params=params, headers=headers) + response_data = self._send_request( + "PUT", url, params=params, headers=headers + ) return ProjectMetadataModel.parse_obj(response_data) except Exception as exception: self._handle_exception(exception) @@ -150,7 +156,9 @@ def apply_entity(self, entity: Entity, project: str, commit: bool = True): data = EntityModel.from_entity(entity).json() params = {"commit": commit} headers = {"client_id": self.client_id} - response_data = self._send_request("PUT", url, params=params, data=data, headers=headers) + response_data = self._send_request( + "PUT", url, params=params, data=data, headers=headers + ) return EntityModel.parse_obj(response_data).to_entity() except Exception as exception: self._handle_exception(exception) @@ -185,7 +193,9 @@ def get_entity( # type: ignore[return] url = f"{self.base_url}/projects/{project}/entities/{name}" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) return EntityModel.parse_obj(response_data).to_entity() except EntityNotFoundException as exception: logger.error( @@ -209,7 +219,9 @@ def list_entities( # type: ignore[return] url = f"{self.base_url}/projects/{project}/entities" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) response_list = response_data if isinstance(response_data, list) else [] return [ EntityModel.parse_obj(entity).to_entity() for entity in response_list @@ -226,7 +238,9 @@ def apply_data_source( headers = {"client_id": self.client_id} if isinstance(data_source, SparkSource): data = SparkSourceModel.from_data_source(data_source).json() - response_data = self._send_request("PUT", url, params=params, data=data, headers=headers) + response_data = self._send_request( + "PUT", url, params=params, data=data, headers=headers + ) return SparkSourceModel.parse_obj(response_data).to_data_source() elif isinstance(data_source, RequestSource): data = RequestSourceModel.from_data_source(data_source).json() @@ -273,7 +287,9 @@ def get_data_source( # type: ignore[return] url = f"{self.base_url}/projects/{project}/data_sources/{name}" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) if "model_type" in response_data: if response_data["model_type"] == "RequestSourceModel": return RequestSourceModel.parse_obj(response_data).to_data_source() @@ -304,7 +320,9 @@ def list_data_sources( # type: ignore[return] url = f"{self.base_url}/projects/{project}/data_sources" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) response_list = response_data if isinstance(response_data, list) else [] data_source_list = [] for data_source in response_list: @@ -330,7 +348,9 @@ def apply_feature_service( data = FeatureServiceModel.from_feature_service(feature_service).json() params = {"commit": commit} headers = {"client_id": self.client_id} - response_data = self._send_request("PUT", url, params=params, data=data, headers=headers) + response_data = self._send_request( + "PUT", url, params=params, data=data, headers=headers + ) return FeatureServiceModel.parse_obj(response_data).to_feature_service() except Exception as exception: self._handle_exception(exception) @@ -366,7 +386,9 @@ def get_feature_service( # type: ignore[return] url = f"{self.base_url}/projects/{project}/feature_services/{name}" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) return FeatureServiceModel.parse_obj(response_data).to_feature_service() except FeatureServiceNotFoundException as exception: logger.error( @@ -388,7 +410,9 @@ def list_feature_services( # type: ignore[return] url = f"{self.base_url}/projects/{project}/feature_services" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) response_list = response_data if isinstance(response_data, list) else [] return [ FeatureServiceModel.parse_obj(feature_service).to_feature_service() @@ -406,12 +430,16 @@ def apply_feature_view( if isinstance(feature_view, FeatureView): url = f"{self.base_url}/projects/{project}/feature_views" data = FeatureViewModel.from_feature_view(feature_view).json() - response_data = self._send_request("PUT", url, params=params, data=data, headers=headers) + response_data = self._send_request( + "PUT", url, params=params, data=data, headers=headers + ) return FeatureViewModel.parse_obj(response_data).to_feature_view() elif isinstance(feature_view, OnDemandFeatureView): url = f"{self.base_url}/projects/{project}/on_demand_feature_views" data = OnDemandFeatureViewModel.from_feature_view(feature_view).json() - response_data = self._send_request("PUT", url, params=params, data=data, headers=headers) + response_data = self._send_request( + "PUT", url, params=params, data=data, headers=headers + ) return OnDemandFeatureViewModel.parse_obj( response_data ).to_feature_view() @@ -453,7 +481,9 @@ def get_feature_view( # type: ignore[return] url = f"{self.base_url}/projects/{project}/feature_views/{name}" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) return FeatureViewModel.parse_obj(response_data).to_feature_view() except FeatureViewNotFoundException as exception: logger.error( @@ -475,7 +505,9 @@ def list_feature_views( # type: ignore[return] url = f"{self.base_url}/projects/{project}/feature_views" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) response_list = response_data if isinstance(response_data, list) else [] return [ FeatureViewModel.parse_obj(feature_view).to_feature_view() @@ -496,7 +528,9 @@ def get_on_demand_feature_view( # type: ignore[return] url = f"{self.base_url}/projects/{project}/on_demand_feature_views/{name}" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) return OnDemandFeatureViewModel.parse_obj(response_data).to_feature_view() except FeatureViewNotFoundException as exception: logger.error( @@ -518,7 +552,9 @@ def list_on_demand_feature_views( # type: ignore[return] url = f"{self.base_url}/projects/{project}/on_demand_feature_views" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) response_list = response_data if isinstance(response_data, list) else [] return [ OnDemandFeatureViewModel.parse_obj(feature_view).to_feature_view() @@ -573,7 +609,9 @@ def apply_materialization( headers = {"client_id": self.client_id} url = f"{self.base_url}/projects/{project}/feature_views" data = FeatureViewModel.from_feature_view(feature_view).json() - response_data = self._send_request("PUT", url, params=params, data=data, headers=headers) + response_data = self._send_request( + "PUT", url, params=params, data=data, headers=headers + ) return FeatureViewModel.parse_obj(response_data).to_feature_view() else: raise TypeError( @@ -787,7 +825,9 @@ def list_project_metadata( # type: ignore[return] url = f"{self.base_url}/projects/{project}" params = {"allow_cache": True} headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, params=params, headers=headers) + response_data = self._send_request( + "GET", url, params=params, headers=headers + ) return [ProjectMetadataModel.parse_obj(response_data).to_project_metadata()] except ProjectMetadataNotFoundException as exception: logger.error( From 3aca15def7db9961aaa0b5522a89def3e0b7bf24 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Thu, 1 Feb 2024 14:40:02 -0800 Subject: [PATCH 09/20] addressed review comment --- sdk/python/feast/infra/registry/http.py | 77 +++++++++---------------- 1 file changed, 27 insertions(+), 50 deletions(-) diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index abaf9c4afd..c57c7575ec 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -61,7 +61,7 @@ class HttpRegistryConfig(RegistryConfig): """ str: Endpoint of Feature registry. If registry_type is 'http', then this is a endpoint of Feature Registry """ - clint_id: Optional[StrictStr] = "Unknown" + client_id: Optional[StrictStr] = "Unknown" CACHE_REFRESH_THRESHOLD_SECONDS = 300 @@ -79,11 +79,10 @@ def __init__( timeout = httpx.Timeout(5.0, connect=60.0) transport = httpx.HTTPTransport(retries=3, verify=False) self.base_url = registry_config.path - self.client_id = registry_config.clint_id self.http_client = httpx.Client( timeout=timeout, transport=transport, - headers={"Content-Type": "application/json"}, + headers={"Content-Type": "application/json", "client_id": registry_config.clint_id}, ) self.project = project self.apply_project(self.project) @@ -142,9 +141,8 @@ def apply_project(self, project: str, commit: bool = True) -> ProjectMetadataMod try: url = f"{self.base_url}/projects" params = {"project": project, "commit": commit} - headers = {"client_id": self.client_id} response_data = self._send_request( - "PUT", url, params=params, headers=headers + "PUT", url, params=params ) return ProjectMetadataModel.parse_obj(response_data) except Exception as exception: @@ -155,9 +153,9 @@ def apply_entity(self, entity: Entity, project: str, commit: bool = True): url = f"{self.base_url}/projects/{project}/entities" data = EntityModel.from_entity(entity).json() params = {"commit": commit} - headers = {"client_id": self.client_id} + response_data = self._send_request( - "PUT", url, params=params, data=data, headers=headers + "PUT", url, params=params, data=data ) return EntityModel.parse_obj(response_data).to_entity() except Exception as exception: @@ -167,8 +165,7 @@ def delete_entity(self, name: str, project: str, commit: bool = True): try: url = f"{self.base_url}/projects/{project}/entities/{name}" params = {"commit": commit} - headers = {"client_id": self.client_id} - self._send_request("DELETE", url, params=params, headers=headers) + self._send_request("DELETE", url, params=params) logger.info(f"Deleted Entity {name} from project {project}") except EntityNotFoundException as exception: logger.error( @@ -192,9 +189,8 @@ def get_entity( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/entities/{name}" params = {"allow_cache": True} - headers = {"client_id": self.client_id} response_data = self._send_request( - "GET", url, params=params, headers=headers + "GET", url, params=params ) return EntityModel.parse_obj(response_data).to_entity() except EntityNotFoundException as exception: @@ -218,9 +214,8 @@ def list_entities( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/entities" params = {"allow_cache": True} - headers = {"client_id": self.client_id} response_data = self._send_request( - "GET", url, params=params, headers=headers + "GET", url, params=params ) response_list = response_data if isinstance(response_data, list) else [] return [ @@ -235,11 +230,10 @@ def apply_data_source( try: url = f"{self.base_url}/projects/{project}/data_sources" params = {"commit": commit} - headers = {"client_id": self.client_id} if isinstance(data_source, SparkSource): data = SparkSourceModel.from_data_source(data_source).json() response_data = self._send_request( - "PUT", url, params=params, data=data, headers=headers + "PUT", url, params=params, data=data ) return SparkSourceModel.parse_obj(response_data).to_data_source() elif isinstance(data_source, RequestSource): @@ -261,8 +255,7 @@ def delete_data_source(self, name: str, project: str, commit: bool = True): try: url = f"{self.base_url}/projects/{project}/data_sources/{name}" params = {"commit": commit} - headers = {"client_id": self.client_id} - self._send_request("DELETE", url, params=params, headers=headers) + self._send_request("DELETE", url, params=params) logger.info(f"Deleted Datasource {name} from project {project}") except DataSourceObjectNotFoundException as exception: logger.error( @@ -286,9 +279,8 @@ def get_data_source( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/data_sources/{name}" params = {"allow_cache": True} - headers = {"client_id": self.client_id} response_data = self._send_request( - "GET", url, params=params, headers=headers + "GET", url, params=params ) if "model_type" in response_data: if response_data["model_type"] == "RequestSourceModel": @@ -319,9 +311,8 @@ def list_data_sources( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/data_sources" params = {"allow_cache": True} - headers = {"client_id": self.client_id} response_data = self._send_request( - "GET", url, params=params, headers=headers + "GET", url, params=params ) response_list = response_data if isinstance(response_data, list) else [] data_source_list = [] @@ -347,9 +338,8 @@ def apply_feature_service( url = f"{self.base_url}/projects/{project}/feature_services" data = FeatureServiceModel.from_feature_service(feature_service).json() params = {"commit": commit} - headers = {"client_id": self.client_id} response_data = self._send_request( - "PUT", url, params=params, data=data, headers=headers + "PUT", url, params=params, data=data ) return FeatureServiceModel.parse_obj(response_data).to_feature_service() except Exception as exception: @@ -359,8 +349,7 @@ def delete_feature_service(self, name: str, project: str, commit: bool = True): try: url = f"{self.base_url}/projects/{project}/feature_services/{name}" params = {"commit": commit} - headers = {"client_id": self.client_id} - self._send_request("DELETE", url, params=params, headers=headers) + self._send_request("DELETE", url, params=params) logger.info(f"Deleted FeatureService {name} from project {project}") except FeatureServiceNotFoundException as exception: logger.error( @@ -385,9 +374,8 @@ def get_feature_service( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/feature_services/{name}" params = {"allow_cache": True} - headers = {"client_id": self.client_id} response_data = self._send_request( - "GET", url, params=params, headers=headers + "GET", url, params=params ) return FeatureServiceModel.parse_obj(response_data).to_feature_service() except FeatureServiceNotFoundException as exception: @@ -409,9 +397,8 @@ def list_feature_services( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/feature_services" params = {"allow_cache": True} - headers = {"client_id": self.client_id} response_data = self._send_request( - "GET", url, params=params, headers=headers + "GET", url, params=params ) response_list = response_data if isinstance(response_data, list) else [] return [ @@ -426,19 +413,18 @@ def apply_feature_view( ): try: params = {"commit": commit} - headers = {"client_id": self.client_id} if isinstance(feature_view, FeatureView): url = f"{self.base_url}/projects/{project}/feature_views" data = FeatureViewModel.from_feature_view(feature_view).json() response_data = self._send_request( - "PUT", url, params=params, data=data, headers=headers + "PUT", url, params=params, data=data ) return FeatureViewModel.parse_obj(response_data).to_feature_view() elif isinstance(feature_view, OnDemandFeatureView): url = f"{self.base_url}/projects/{project}/on_demand_feature_views" data = OnDemandFeatureViewModel.from_feature_view(feature_view).json() response_data = self._send_request( - "PUT", url, params=params, data=data, headers=headers + "PUT", url, params=params, data=data ) return OnDemandFeatureViewModel.parse_obj( response_data @@ -454,8 +440,7 @@ def delete_feature_view(self, name: str, project: str, commit: bool = True): try: url = f"{self.base_url}/projects/{project}/feature_views/{name}" params = {"commit": commit} - headers = {"client_id": self.client_id} - self._send_request("DELETE", url, params=params, headers=headers) + self._send_request("DELETE", url, params=params) logger.info(f"Deleted FeatureView {name} from project {project}") except FeatureViewNotFoundException as exception: logger.error( @@ -480,9 +465,8 @@ def get_feature_view( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/feature_views/{name}" params = {"allow_cache": True} - headers = {"client_id": self.client_id} response_data = self._send_request( - "GET", url, params=params, headers=headers + "GET", url, params=params ) return FeatureViewModel.parse_obj(response_data).to_feature_view() except FeatureViewNotFoundException as exception: @@ -504,9 +488,8 @@ def list_feature_views( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/feature_views" params = {"allow_cache": True} - headers = {"client_id": self.client_id} response_data = self._send_request( - "GET", url, params=params, headers=headers + "GET", url, params=params ) response_list = response_data if isinstance(response_data, list) else [] return [ @@ -527,9 +510,8 @@ def get_on_demand_feature_view( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/on_demand_feature_views/{name}" params = {"allow_cache": True} - headers = {"client_id": self.client_id} response_data = self._send_request( - "GET", url, params=params, headers=headers + "GET", url, params=params ) return OnDemandFeatureViewModel.parse_obj(response_data).to_feature_view() except FeatureViewNotFoundException as exception: @@ -551,9 +533,8 @@ def list_on_demand_feature_views( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/on_demand_feature_views" params = {"allow_cache": True} - headers = {"client_id": self.client_id} response_data = self._send_request( - "GET", url, params=params, headers=headers + "GET", url, params=params ) response_list = response_data if isinstance(response_data, list) else [] return [ @@ -606,11 +587,10 @@ def apply_materialization( if isinstance(feature_view, FeatureView): feature_view.materialization_intervals.append((start_date, end_date)) params = {"commit": commit} - headers = {"client_id": self.client_id} url = f"{self.base_url}/projects/{project}/feature_views" data = FeatureViewModel.from_feature_view(feature_view).json() response_data = self._send_request( - "PUT", url, params=params, data=data, headers=headers + "PUT", url, params=params, data=data ) return FeatureViewModel.parse_obj(response_data).to_feature_view() else: @@ -796,8 +776,7 @@ def _check_if_registry_refreshed(self): def _get_all_projects(self) -> Set[str]: # type: ignore[return] try: url = f"{self.base_url}/projects" - headers = {"client_id": self.client_id} - projects = self._send_request("GET", url, headers=headers) + projects = self._send_request("GET", url) return {project["project_name"] for project in projects} except Exception as exception: self._handle_exception(exception) @@ -805,8 +784,7 @@ def _get_all_projects(self) -> Set[str]: # type: ignore[return] def _get_last_updated_metadata(self, project: str): try: url = f"{self.base_url}/projects/{project}" - headers = {"client_id": self.client_id} - response_data = self._send_request("GET", url, headers=headers) + response_data = self._send_request("GET", url) return datetime.strptime( response_data["last_updated_timestamp"], "%Y-%m-%dT%H:%M:%S" ) @@ -824,9 +802,8 @@ def list_project_metadata( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}" params = {"allow_cache": True} - headers = {"client_id": self.client_id} response_data = self._send_request( - "GET", url, params=params, headers=headers + "GET", url, params=params ) return [ProjectMetadataModel.parse_obj(response_data).to_project_metadata()] except ProjectMetadataNotFoundException as exception: From f728b4c9d70595767b704d60fa699a295ba4f78c Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Thu, 1 Feb 2024 14:42:04 -0800 Subject: [PATCH 10/20] formatting --- sdk/python/feast/infra/registry/http.py | 77 +++++++------------------ 1 file changed, 22 insertions(+), 55 deletions(-) diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index c57c7575ec..19bc79f8a0 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -82,7 +82,10 @@ def __init__( self.http_client = httpx.Client( timeout=timeout, transport=transport, - headers={"Content-Type": "application/json", "client_id": registry_config.clint_id}, + headers={ + "Content-Type": "application/json", + "client_id": registry_config.clint_id, + }, ) self.project = project self.apply_project(self.project) @@ -141,9 +144,7 @@ def apply_project(self, project: str, commit: bool = True) -> ProjectMetadataMod try: url = f"{self.base_url}/projects" params = {"project": project, "commit": commit} - response_data = self._send_request( - "PUT", url, params=params - ) + response_data = self._send_request("PUT", url, params=params) return ProjectMetadataModel.parse_obj(response_data) except Exception as exception: self._handle_exception(exception) @@ -154,9 +155,7 @@ def apply_entity(self, entity: Entity, project: str, commit: bool = True): data = EntityModel.from_entity(entity).json() params = {"commit": commit} - response_data = self._send_request( - "PUT", url, params=params, data=data - ) + response_data = self._send_request("PUT", url, params=params, data=data) return EntityModel.parse_obj(response_data).to_entity() except Exception as exception: self._handle_exception(exception) @@ -189,9 +188,7 @@ def get_entity( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/entities/{name}" params = {"allow_cache": True} - response_data = self._send_request( - "GET", url, params=params - ) + response_data = self._send_request("GET", url, params=params) return EntityModel.parse_obj(response_data).to_entity() except EntityNotFoundException as exception: logger.error( @@ -214,9 +211,7 @@ def list_entities( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/entities" params = {"allow_cache": True} - response_data = self._send_request( - "GET", url, params=params - ) + response_data = self._send_request("GET", url, params=params) response_list = response_data if isinstance(response_data, list) else [] return [ EntityModel.parse_obj(entity).to_entity() for entity in response_list @@ -232,9 +227,7 @@ def apply_data_source( params = {"commit": commit} if isinstance(data_source, SparkSource): data = SparkSourceModel.from_data_source(data_source).json() - response_data = self._send_request( - "PUT", url, params=params, data=data - ) + response_data = self._send_request("PUT", url, params=params, data=data) return SparkSourceModel.parse_obj(response_data).to_data_source() elif isinstance(data_source, RequestSource): data = RequestSourceModel.from_data_source(data_source).json() @@ -279,9 +272,7 @@ def get_data_source( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/data_sources/{name}" params = {"allow_cache": True} - response_data = self._send_request( - "GET", url, params=params - ) + response_data = self._send_request("GET", url, params=params) if "model_type" in response_data: if response_data["model_type"] == "RequestSourceModel": return RequestSourceModel.parse_obj(response_data).to_data_source() @@ -311,9 +302,7 @@ def list_data_sources( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/data_sources" params = {"allow_cache": True} - response_data = self._send_request( - "GET", url, params=params - ) + response_data = self._send_request("GET", url, params=params) response_list = response_data if isinstance(response_data, list) else [] data_source_list = [] for data_source in response_list: @@ -338,9 +327,7 @@ def apply_feature_service( url = f"{self.base_url}/projects/{project}/feature_services" data = FeatureServiceModel.from_feature_service(feature_service).json() params = {"commit": commit} - response_data = self._send_request( - "PUT", url, params=params, data=data - ) + response_data = self._send_request("PUT", url, params=params, data=data) return FeatureServiceModel.parse_obj(response_data).to_feature_service() except Exception as exception: self._handle_exception(exception) @@ -374,9 +361,7 @@ def get_feature_service( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/feature_services/{name}" params = {"allow_cache": True} - response_data = self._send_request( - "GET", url, params=params - ) + response_data = self._send_request("GET", url, params=params) return FeatureServiceModel.parse_obj(response_data).to_feature_service() except FeatureServiceNotFoundException as exception: logger.error( @@ -397,9 +382,7 @@ def list_feature_services( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/feature_services" params = {"allow_cache": True} - response_data = self._send_request( - "GET", url, params=params - ) + response_data = self._send_request("GET", url, params=params) response_list = response_data if isinstance(response_data, list) else [] return [ FeatureServiceModel.parse_obj(feature_service).to_feature_service() @@ -416,16 +399,12 @@ def apply_feature_view( if isinstance(feature_view, FeatureView): url = f"{self.base_url}/projects/{project}/feature_views" data = FeatureViewModel.from_feature_view(feature_view).json() - response_data = self._send_request( - "PUT", url, params=params, data=data - ) + response_data = self._send_request("PUT", url, params=params, data=data) return FeatureViewModel.parse_obj(response_data).to_feature_view() elif isinstance(feature_view, OnDemandFeatureView): url = f"{self.base_url}/projects/{project}/on_demand_feature_views" data = OnDemandFeatureViewModel.from_feature_view(feature_view).json() - response_data = self._send_request( - "PUT", url, params=params, data=data - ) + response_data = self._send_request("PUT", url, params=params, data=data) return OnDemandFeatureViewModel.parse_obj( response_data ).to_feature_view() @@ -465,9 +444,7 @@ def get_feature_view( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/feature_views/{name}" params = {"allow_cache": True} - response_data = self._send_request( - "GET", url, params=params - ) + response_data = self._send_request("GET", url, params=params) return FeatureViewModel.parse_obj(response_data).to_feature_view() except FeatureViewNotFoundException as exception: logger.error( @@ -488,9 +465,7 @@ def list_feature_views( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/feature_views" params = {"allow_cache": True} - response_data = self._send_request( - "GET", url, params=params - ) + response_data = self._send_request("GET", url, params=params) response_list = response_data if isinstance(response_data, list) else [] return [ FeatureViewModel.parse_obj(feature_view).to_feature_view() @@ -510,9 +485,7 @@ def get_on_demand_feature_view( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/on_demand_feature_views/{name}" params = {"allow_cache": True} - response_data = self._send_request( - "GET", url, params=params - ) + response_data = self._send_request("GET", url, params=params) return OnDemandFeatureViewModel.parse_obj(response_data).to_feature_view() except FeatureViewNotFoundException as exception: logger.error( @@ -533,9 +506,7 @@ def list_on_demand_feature_views( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}/on_demand_feature_views" params = {"allow_cache": True} - response_data = self._send_request( - "GET", url, params=params - ) + response_data = self._send_request("GET", url, params=params) response_list = response_data if isinstance(response_data, list) else [] return [ OnDemandFeatureViewModel.parse_obj(feature_view).to_feature_view() @@ -589,9 +560,7 @@ def apply_materialization( params = {"commit": commit} url = f"{self.base_url}/projects/{project}/feature_views" data = FeatureViewModel.from_feature_view(feature_view).json() - response_data = self._send_request( - "PUT", url, params=params, data=data - ) + response_data = self._send_request("PUT", url, params=params, data=data) return FeatureViewModel.parse_obj(response_data).to_feature_view() else: raise TypeError( @@ -802,9 +771,7 @@ def list_project_metadata( # type: ignore[return] try: url = f"{self.base_url}/projects/{project}" params = {"allow_cache": True} - response_data = self._send_request( - "GET", url, params=params - ) + response_data = self._send_request("GET", url, params=params) return [ProjectMetadataModel.parse_obj(response_data).to_project_metadata()] except ProjectMetadataNotFoundException as exception: logger.error( From 1aa7a819ae65b68697ac99a44c51c1efa0acbf0a Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Thu, 1 Feb 2024 14:45:18 -0800 Subject: [PATCH 11/20] formatting --- sdk/python/feast/infra/registry/http.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index 19bc79f8a0..3007622d29 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -126,11 +126,11 @@ def _handle_exception(self, exception: Exception): raise httpx.HTTPError("Request failed with exception: " + str(exception)) def _send_request( - self, method: str, url: str, params=None, data=None, headers=None + self, method: str, url: str, params=None, data=None ): try: request = httpx.Request( - method=method, url=url, params=params, data=data, headers=headers + method=method, url=url, params=params, data=data ) response = self.http_client.send(request) response.raise_for_status() From e458014e3ddd6fdd7a8622c5669056b00be39dda Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Thu, 1 Feb 2024 14:45:59 -0800 Subject: [PATCH 12/20] formatting --- sdk/python/feast/infra/registry/http.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index 3007622d29..b06fa2f753 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -125,13 +125,9 @@ def _handle_exception(self, exception: Exception): logger.exception("Request failed with exception: %s", str(exception)) raise httpx.HTTPError("Request failed with exception: " + str(exception)) - def _send_request( - self, method: str, url: str, params=None, data=None - ): + def _send_request(self, method: str, url: str, params=None, data=None): try: - request = httpx.Request( - method=method, url=url, params=params, data=data - ) + request = httpx.Request(method=method, url=url, params=params, data=data) response = self.http_client.send(request) response.raise_for_status() return response.json() From 7c3fb1dbd1e5769450a21fd768c6b13a597d6689 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Thu, 1 Feb 2024 15:10:11 -0800 Subject: [PATCH 13/20] fixed an error with header --- sdk/python/feast/infra/registry/http.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index b06fa2f753..fe01a34990 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -79,13 +79,13 @@ def __init__( timeout = httpx.Timeout(5.0, connect=60.0) transport = httpx.HTTPTransport(retries=3, verify=False) self.base_url = registry_config.path + headers = [ + ("Content-Type", "application/json"), + ("client_id", registry_config.client_id), + ] + self.http_client = httpx.Client( - timeout=timeout, - transport=transport, - headers={ - "Content-Type": "application/json", - "client_id": registry_config.clint_id, - }, + timeout=timeout, transport=transport, headers=headers ) self.project = project self.apply_project(self.project) From 0b9d07daf9527ee854af69fb24762d5094726338 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Thu, 1 Feb 2024 15:29:18 -0800 Subject: [PATCH 14/20] fixed a type --- sdk/python/feast/repo_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index bd6737f781..bdf063f6e4 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -131,7 +131,7 @@ class RegistryConfig(FeastBaseModel): s3_additional_kwargs: Optional[Dict[str, str]] """ Dict[str, str]: Extra arguments to pass to boto3 when writing the registry file to S3. """ - clint_id: Optional[StrictStr] = "Unknown" + client_id: Optional[StrictStr] = "Unknown" class RepoConfig(FeastBaseModel): From d0b46b5e0bddf45274e554fee6a62f75e0e2e71a Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Thu, 1 Feb 2024 15:39:37 -0800 Subject: [PATCH 15/20] fixed a typo --- sdk/python/feast/infra/registry/http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index fe01a34990..d3e1e1d3d0 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -81,7 +81,7 @@ def __init__( self.base_url = registry_config.path headers = [ ("Content-Type", "application/json"), - ("client_id", registry_config.client_id), + ("Client-Id", registry_config.client_id), ] self.http_client = httpx.Client( From cef1a397509be495d7d6683791b8b41c9e4cf95d Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Thu, 1 Feb 2024 15:52:34 -0800 Subject: [PATCH 16/20] fixing header --- sdk/python/feast/infra/registry/http.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index d3e1e1d3d0..b4009710fa 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -79,10 +79,10 @@ def __init__( timeout = httpx.Timeout(5.0, connect=60.0) transport = httpx.HTTPTransport(retries=3, verify=False) self.base_url = registry_config.path - headers = [ + headers = httpx.Headers([ ("Content-Type", "application/json"), ("Client-Id", registry_config.client_id), - ] + ]) self.http_client = httpx.Client( timeout=timeout, transport=transport, headers=headers From e822a7402ae7333e3bd7a291695d9e224578fd2a Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Thu, 1 Feb 2024 16:08:04 -0800 Subject: [PATCH 17/20] fixing header --- sdk/python/feast/infra/registry/http.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index b4009710fa..f214580196 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -79,11 +79,12 @@ def __init__( timeout = httpx.Timeout(5.0, connect=60.0) transport = httpx.HTTPTransport(retries=3, verify=False) self.base_url = registry_config.path - headers = httpx.Headers([ - ("Content-Type", "application/json"), - ("Client-Id", registry_config.client_id), - ]) - + headers = { + "Content-Type", + "application/json", + "Client-Id", + registry_config.client_id, + } self.http_client = httpx.Client( timeout=timeout, transport=transport, headers=headers ) From 10387fec6f3fd5fa9724ea432bb7b4ca2d97a2a2 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Thu, 1 Feb 2024 16:09:26 -0800 Subject: [PATCH 18/20] fixing header --- sdk/python/feast/infra/registry/http.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index f214580196..9c1be2ecf6 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -80,10 +80,8 @@ def __init__( transport = httpx.HTTPTransport(retries=3, verify=False) self.base_url = registry_config.path headers = { - "Content-Type", - "application/json", - "Client-Id", - registry_config.client_id, + "Content-Type": "application/json", + "Client-Id": registry_config.client_id, } self.http_client = httpx.Client( timeout=timeout, transport=transport, headers=headers From 762f53a6aefc02c018bc46533eb975fed1a7319c Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Thu, 1 Feb 2024 22:55:08 -0800 Subject: [PATCH 19/20] fixing header --- sdk/python/feast/infra/registry/http.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index 9c1be2ecf6..a45d5d8ec6 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -79,10 +79,12 @@ def __init__( timeout = httpx.Timeout(5.0, connect=60.0) transport = httpx.HTTPTransport(retries=3, verify=False) self.base_url = registry_config.path - headers = { - "Content-Type": "application/json", - "Client-Id": registry_config.client_id, - } + headers = httpx.Headers( + { + "Content-Type": "application/json", + "Client-Id": registry_config.client_id, + } + ) self.http_client = httpx.Client( timeout=timeout, transport=transport, headers=headers ) From cd817391b58ef42e2220f2fb8cff1170c743b268 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Thu, 1 Feb 2024 23:21:58 -0800 Subject: [PATCH 20/20] fixing header --- sdk/python/feast/infra/registry/http.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index a45d5d8ec6..dac963a95a 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -79,12 +79,14 @@ def __init__( timeout = httpx.Timeout(5.0, connect=60.0) transport = httpx.HTTPTransport(retries=3, verify=False) self.base_url = registry_config.path + headers_dict = { + "Content-Type": "application/json", + "Client-Id": registry_config.client_id, + } headers = httpx.Headers( - { - "Content-Type": "application/json", - "Client-Id": registry_config.client_id, - } + {k: str(v) for k, v in headers_dict.items() if v is not None} ) + self.http_client = httpx.Client( timeout=timeout, transport=transport, headers=headers )