Skip to content

Commit

Permalink
add healthcheck to transformation service/proto
Browse files Browse the repository at this point in the history
  • Loading branch information
William Parsley committed Nov 16, 2023
1 parent 9e95d6c commit e1bc604
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 44 deletions.
16 changes: 16 additions & 0 deletions protos/feast/serving/TransformationService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ service TransformationService {
rpc GetTransformationServiceInfo (GetTransformationServiceInfoRequest) returns (GetTransformationServiceInfoResponse);

rpc TransformFeatures (TransformFeaturesRequest) returns (TransformFeaturesResponse);

rpc Health (HealthCheckRequest) returns (HealthCheckResponse);
}

message ValueType {
Expand Down Expand Up @@ -65,3 +67,17 @@ enum TransformationServiceType {

TRANSFORMATION_SERVICE_TYPE_CUSTOM = 100;
}

enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
}

message HealthCheckRequest {
string service = 1;
}

message HealthCheckResponse {
ServingStatus status = 1;
}
51 changes: 7 additions & 44 deletions sdk/python/feast/transformation_server.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
import logging
import sys
from concurrent import futures
import threading

import grpc
import pyarrow as pa
from grpc_reflection.v1alpha import reflection
from fastapi import FastAPI, Response, status
import gunicorn.app.base

from feast.errors import OnDemandFeatureViewNotFoundException
from feast.feature_store import FeatureStore
from feast.protos.feast.serving.TransformationService_pb2 import (
DESCRIPTOR,
TRANSFORMATION_SERVICE_TYPE_PYTHON,
SERVING,
GetTransformationServiceInfoResponse,
TransformFeaturesResponse,
HelathCheckResponse,
ValueType,
)
from feast.protos.feast.serving.TransformationService_pb2_grpc import (
Expand All @@ -26,21 +25,14 @@

log = logging.getLogger(__name__)

def get_health_check_app():
app = FastAPI()

@app.get("/health")
def health():
return Response(status_code=status.HTTP_200_OK)

return app


class TransformationServer(TransformationServiceServicer):
def __init__(self, fs: FeatureStore) -> None:
super().__init__()
self.fs = fs

def Health(self, request, context):
return HealthCheckResponse(status=SERVING)

def GetTransformationServiceInfo(self, request, context):
response = GetTransformationServiceInfoResponse(
type=TRANSFORMATION_SERVICE_TYPE_PYTHON,
Expand Down Expand Up @@ -71,34 +63,6 @@ def TransformFeatures(self, request, context):
return TransformFeaturesResponse(
transformation_output=ValueType(arrow_value=buf)
)
class FeastTransformationServeApplication(gunicorn.app.base.BaseApplication):
def __init__(self, **options):
self._app = get_health_check_app()
self._options = options
super().__init__()

def load_config(self):
for key, value in self._options.items():
if key.lower() in self.cfg.settings and value is not None:
self.cfg.set(key.lower(), value)

self.cfg.set("worker_class", "uvicorn.workers.UvicornWorker")

def load(self):
return self._app

def _start_server(server):
server_thread = threading.Thread(target=_run_server, args=[server])
server_thread.daemon = True
server_thread.start()
return server_thread

def _run_server(server):
try:
server.start()
server.wait_for_termination()
except Exception as e:
print(e)

def start_server(store: FeatureStore, port: int):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
Expand All @@ -109,6 +73,5 @@ def start_server(store: FeatureStore, port: int):
)
reflection.enable_server_reflection(service_names_available_for_reflection, server)
server.add_insecure_port(f"[::]:{port}")
server_thread = _start_server(server)
FeastTransformationServeApplication().run()
server_thread.join()
server.start()
server.wait_for_termination()

0 comments on commit e1bc604

Please sign in to comment.