diff --git a/protos/feast/third_party/grpc/health/v1/HealthService.proto b/protos/feast/third_party/grpc/health/v1/HealthService.proto index 3c5504e982..7ee681f736 100644 --- a/protos/feast/third_party/grpc/health/v1/HealthService.proto +++ b/protos/feast/third_party/grpc/health/v1/HealthService.proto @@ -21,4 +21,4 @@ message HealthCheckResponse { service Health { rpc Check(HealthCheckRequest) returns (HealthCheckResponse); -} \ No newline at end of file +} diff --git a/sdk/python/feast/transformation_server.py b/sdk/python/feast/transformation_server.py index c18350cb7d..21a8cb71d1 100644 --- a/sdk/python/feast/transformation_server.py +++ b/sdk/python/feast/transformation_server.py @@ -3,8 +3,6 @@ from concurrent import futures import grpc -from grpc_health.v1 import health -from grpc_health.v1 import health_pb2_grpc import pyarrow as pa from grpc_reflection.v1alpha import reflection @@ -21,11 +19,28 @@ TransformationServiceServicer, add_TransformationServiceServicer_to_server, ) +from feast.protos.feast.third_party.grpc.health.v1.HealthService_pb2 import ( + HealthCheckResponse, + ServingStatus, +) +from feast.protos.feast.third_party.grpc.health.v1.HealthService_pb2_grpc import ( + HealthServicer, + add_HealthServicer_to_server, +) from feast.version import get_version log = logging.getLogger(__name__) +class HealthServer(HealthServicer): + def __init__(self) -> None: + super().__init__() + + def Check(self, request, context): + response = HealthCheckResponse(status=ServingStatus.SERVING) + return response + + class TransformationServer(TransformationServiceServicer): def __init__(self, fs: FeatureStore) -> None: super().__init__() @@ -64,12 +79,12 @@ def TransformFeatures(self, request, context): def start_server(store: FeatureStore, port: int): + log.info("Starting server..") server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) add_TransformationServiceServicer_to_server(TransformationServer(store), server) - # Add health check service - health_servicer = health.HealthServicer() - health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server) + # Add health check service to server + add_HealthServicer_to_server(HealthServer(), server) service_names_available_for_reflection = ( DESCRIPTOR.services_by_name["TransformationService"].full_name,