Skip to content

Commit

Permalink
enhance: add restful api to trigger component stop (milvus-io#32076) (m…
Browse files Browse the repository at this point in the history
…ilvus-io#33799)

issue: milvus-io#32698
pr: milvus-io#32076
This PR add two rest api for component stop and status check:
1. `/management/stop?role=querynode` can stop the specified component
2. `/management/check/ready?role=rootcoord` can check whether the target
component is serviceable

---------

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Jun 17, 2024
1 parent 08c096c commit 4513569
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 78 deletions.
35 changes: 33 additions & 2 deletions cmd/roles/roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/cmd/components"
"github.com/milvus-io/milvus/internal/http"
"github.com/milvus-io/milvus/internal/http/healthz"
Expand Down Expand Up @@ -254,11 +255,11 @@ func (mr *MilvusRoles) setupLogger() {
func setupPrometheusHTTPServer(r *internalmetrics.MilvusRegistry) {
log.Info("setupPrometheusHTTPServer")
http.Register(&http.Handler{
Path: "/metrics",
Path: http.MetricsPath,
Handler: promhttp.HandlerFor(r, promhttp.HandlerOpts{}),
})
http.Register(&http.Handler{
Path: "/metrics_default",
Path: http.MetricsDefaultPath,
Handler: promhttp.Handler(),
})
}
Expand Down Expand Up @@ -357,41 +358,71 @@ func (mr *MilvusRoles) Run() {
var wg sync.WaitGroup
local := mr.Local

componentMap := make(map[string]component)
var rootCoord, queryCoord, indexCoord, dataCoord component
var proxy, dataNode, indexNode, queryNode component
if mr.EnableRootCoord {
rootCoord = mr.runRootCoord(ctx, local, &wg)
componentMap[typeutil.RootCoordRole] = rootCoord
}

if mr.EnableDataCoord {
dataCoord = mr.runDataCoord(ctx, local, &wg)
componentMap[typeutil.DataCoordRole] = dataCoord
}

if mr.EnableIndexCoord {
indexCoord = mr.runIndexCoord(ctx, local, &wg)
componentMap[typeutil.IndexCoordRole] = indexCoord
}

if mr.EnableQueryCoord {
queryCoord = mr.runQueryCoord(ctx, local, &wg)
componentMap[typeutil.QueryCoordRole] = queryCoord
}

if mr.EnableQueryNode {
queryNode = mr.runQueryNode(ctx, local, &wg)
componentMap[typeutil.QueryNodeRole] = queryNode
}

if mr.EnableDataNode {
dataNode = mr.runDataNode(ctx, local, &wg)
componentMap[typeutil.DataNodeRole] = dataNode
}
if mr.EnableIndexNode {
indexNode = mr.runIndexNode(ctx, local, &wg)
componentMap[typeutil.IndexNodeRole] = indexNode
}

if mr.EnableProxy {
proxy = mr.runProxy(ctx, local, &wg)
componentMap[typeutil.ProxyRole] = proxy
}

wg.Wait()

http.RegisterStopComponent(func(role string) error {
if len(role) == 0 || componentMap[role] == nil {
return fmt.Errorf("stop component [%s] in [%s] is not supported", role, mr.ServerType)
}
return componentMap[role].Stop()
})

http.RegisterCheckComponentReady(func(role string) error {
if len(role) == 0 || componentMap[role] == nil {
return fmt.Errorf("check component state for [%s] in [%s] is not supported", role, mr.ServerType)
}

// for coord component, if it's in standby state, it will return StateCode_StandBy
code := componentMap[role].Health(context.TODO())
if code != commonpb.StateCode_Healthy {
return fmt.Errorf("component [%s] in [%s] is not healthy", role, mr.ServerType)
}

return nil
})

mr.setupLogger()
tracer.Init()
paramtable.Get().WatchKeyPrefix("trace", config.NewHandler("tracing handler", func(e *config.Event) {
Expand Down
2 changes: 1 addition & 1 deletion internal/distributed/proxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (s *Server) registerHTTPServer() {
apiv1 := metricsGinHandler.Group(apiPathPrefix)
httpserver.NewHandlers(s.proxy).RegisterRoutesTo(apiv1)
management.Register(&management.Handler{
Path: "/",
Path: management.RootPath,
HandlerFunc: nil,
Handler: metricsGinHandler.Handler(),
})
Expand Down
31 changes: 31 additions & 0 deletions internal/http/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,34 @@ const EventLogRouterPath = "/eventlog"

// ExprPath is path for expression.
const ExprPath = "/expr"

const RootPath = "/"

// Prometheus restful api path
const (
MetricsPath = "/metrics"
MetricsDefaultPath = "/metrics_default"
)

// for every component, register it's own api to trigger stop and check ready
const (
RouteTriggerStopPath = "/management/stop"
RouteCheckComponentReady = "/management/check/ready"
)

// proxy management restful api root path
const (
RouteGcPause = "/management/datacoord/garbage_collection/pause"
RouteGcResume = "/management/datacoord/garbage_collection/resume"

RouteSuspendQueryCoordBalance = "/management/querycoord/balance/suspend"
RouteResumeQueryCoordBalance = "/management/querycoord/balance/resume"
RouteTransferSegment = "/management/querycoord/transfer/segment"
RouteTransferChannel = "/management/querycoord/transfer/channel"

RouteSuspendQueryNode = "/management/querycoord/node/suspend"
RouteResumeQueryNode = "/management/querycoord/node/resume"
RouteListQueryNode = "/management/querycoord/node/list"
RouteGetQueryNodeDistribution = "/management/querycoord/distribution/get"
RouteCheckQueryNodeDistribution = "/management/querycoord/distribution/check"
)
40 changes: 40 additions & 0 deletions internal/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,46 @@ func registerDefaults() {
})
}

func RegisterStopComponent(triggerComponentStop func(role string) error) {
// register restful api to trigger stop
Register(&Handler{
Path: RouteTriggerStopPath,
HandlerFunc: func(w http.ResponseWriter, req *http.Request) {
role := req.URL.Query().Get("role")
log.Info("start to trigger component stop", zap.String("role", role))
if err := triggerComponentStop(role); err != nil {
log.Warn("failed to trigger component stop", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf(`{"msg": "failed to trigger component stop, %s"}`, err.Error())))
return
}
log.Info("finish to trigger component stop", zap.String("role", role))
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"msg": "OK"}`))
},
})
}

func RegisterCheckComponentReady(checkActive func(role string) error) {
// register restful api to check component ready
Register(&Handler{
Path: RouteCheckComponentReady,
HandlerFunc: func(w http.ResponseWriter, req *http.Request) {
role := req.URL.Query().Get("role")
log.Info("start to check component ready", zap.String("role", role))
if err := checkActive(role); err != nil {
log.Warn("failed to check component ready", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf(`{"msg": "failed to to check component ready, %s"}`, err.Error())))
return
}
log.Info("finish to check component ready", zap.String("role", role))
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"msg": "OK"}`))
},
})
}

func Register(h *Handler) {
if metricsServer == nil {
if paramtable.Get().HTTPCfg.EnablePprof.GetAsBool() {
Expand Down
39 changes: 11 additions & 28 deletions internal/proxy/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,69 +32,52 @@ import (
)

// this file contains proxy management restful API handler

const (
mgrRouteGcPause = `/management/datacoord/garbage_collection/pause`
mgrRouteGcResume = `/management/datacoord/garbage_collection/resume`

mgrSuspendQueryCoordBalance = `/management/querycoord/balance/suspend`
mgrResumeQueryCoordBalance = `/management/querycoord/balance/resume`
mgrTransferSegment = `/management/querycoord/transfer/segment`
mgrTransferChannel = `/management/querycoord/transfer/channel`

mgrSuspendQueryNode = `/management/querycoord/node/suspend`
mgrResumeQueryNode = `/management/querycoord/node/resume`
mgrListQueryNode = `/management/querycoord/node/list`
mgrGetQueryNodeDistribution = `/management/querycoord/distribution/get`
mgrCheckQueryNodeDistribution = `/management/querycoord/distribution/check`
)

var mgrRouteRegisterOnce sync.Once

func RegisterMgrRoute(proxy *Proxy) {
mgrRouteRegisterOnce.Do(func() {
management.Register(&management.Handler{
Path: mgrRouteGcPause,
Path: management.RouteGcPause,
HandlerFunc: proxy.PauseDatacoordGC,
})
management.Register(&management.Handler{
Path: mgrRouteGcResume,
Path: management.RouteGcResume,
HandlerFunc: proxy.ResumeDatacoordGC,
})
management.Register(&management.Handler{
Path: mgrListQueryNode,
Path: management.RouteListQueryNode,
HandlerFunc: proxy.ListQueryNode,
})
management.Register(&management.Handler{
Path: mgrGetQueryNodeDistribution,
Path: management.RouteGetQueryNodeDistribution,
HandlerFunc: proxy.GetQueryNodeDistribution,
})
management.Register(&management.Handler{
Path: mgrSuspendQueryCoordBalance,
Path: management.RouteSuspendQueryCoordBalance,
HandlerFunc: proxy.SuspendQueryCoordBalance,
})
management.Register(&management.Handler{
Path: mgrResumeQueryCoordBalance,
Path: management.RouteResumeQueryCoordBalance,
HandlerFunc: proxy.ResumeQueryCoordBalance,
})
management.Register(&management.Handler{
Path: mgrSuspendQueryNode,
Path: management.RouteSuspendQueryNode,
HandlerFunc: proxy.SuspendQueryNode,
})
management.Register(&management.Handler{
Path: mgrResumeQueryNode,
Path: management.RouteResumeQueryNode,
HandlerFunc: proxy.ResumeQueryNode,
})
management.Register(&management.Handler{
Path: mgrTransferSegment,
Path: management.RouteTransferSegment,
HandlerFunc: proxy.TransferSegment,
})
management.Register(&management.Handler{
Path: mgrTransferChannel,
Path: management.RouteTransferChannel,
HandlerFunc: proxy.TransferChannel,
})
management.Register(&management.Handler{
Path: mgrCheckQueryNodeDistribution,
Path: management.RouteCheckQueryNodeDistribution,
HandlerFunc: proxy.CheckQueryNodeDistribution,
})
})
Expand Down
Loading

0 comments on commit 4513569

Please sign in to comment.