diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 16be7ac378958..fa850518a20d1 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -31,6 +31,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/cmd/components" "github.com/milvus-io/milvus/internal/http" "github.com/milvus-io/milvus/internal/http/healthz" @@ -254,11 +255,11 @@ func (mr *MilvusRoles) setupLogger() { func setupPrometheusHTTPServer(r *internalmetrics.MilvusRegistry) { log.Info("setupPrometheusHTTPServer") http.Register(&http.Handler{ - Path: "/metrics", + Path: http.MetricsPath, Handler: promhttp.HandlerFor(r, promhttp.HandlerOpts{}), }) http.Register(&http.Handler{ - Path: "/metrics_default", + Path: http.MetricsDefaultPath, Handler: promhttp.Handler(), }) } @@ -357,41 +358,71 @@ func (mr *MilvusRoles) Run() { var wg sync.WaitGroup local := mr.Local + componentMap := make(map[string]component) var rootCoord, queryCoord, indexCoord, dataCoord component var proxy, dataNode, indexNode, queryNode component if mr.EnableRootCoord { rootCoord = mr.runRootCoord(ctx, local, &wg) + componentMap[typeutil.RootCoordRole] = rootCoord } if mr.EnableDataCoord { dataCoord = mr.runDataCoord(ctx, local, &wg) + componentMap[typeutil.DataCoordRole] = dataCoord } if mr.EnableIndexCoord { indexCoord = mr.runIndexCoord(ctx, local, &wg) + componentMap[typeutil.IndexCoordRole] = indexCoord } if mr.EnableQueryCoord { queryCoord = mr.runQueryCoord(ctx, local, &wg) + componentMap[typeutil.QueryCoordRole] = queryCoord } if mr.EnableQueryNode { queryNode = mr.runQueryNode(ctx, local, &wg) + componentMap[typeutil.QueryNodeRole] = queryNode } if mr.EnableDataNode { dataNode = mr.runDataNode(ctx, local, &wg) + componentMap[typeutil.DataNodeRole] = dataNode } if mr.EnableIndexNode { indexNode = mr.runIndexNode(ctx, local, &wg) + componentMap[typeutil.IndexNodeRole] = indexNode } if mr.EnableProxy { proxy = mr.runProxy(ctx, local, &wg) + componentMap[typeutil.ProxyRole] = proxy } wg.Wait() + http.RegisterStopComponent(func(role string) error { + if len(role) == 0 || componentMap[role] == nil { + return fmt.Errorf("stop component [%s] in [%s] is not supported", role, mr.ServerType) + } + return componentMap[role].Stop() + }) + + http.RegisterCheckComponentReady(func(role string) error { + if len(role) == 0 || componentMap[role] == nil { + return fmt.Errorf("check component state for [%s] in [%s] is not supported", role, mr.ServerType) + } + + // for coord component, if it's in standby state, it will return StateCode_StandBy + code := componentMap[role].Health(context.TODO()) + if code != commonpb.StateCode_Healthy { + return fmt.Errorf("component [%s] in [%s] is not healthy", role, mr.ServerType) + } + + return nil + }) + mr.setupLogger() tracer.Init() paramtable.Get().WatchKeyPrefix("trace", config.NewHandler("tracing handler", func(e *config.Event) { diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index bb902bc3280ae..5558d3976c640 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -163,7 +163,7 @@ func (s *Server) registerHTTPServer() { apiv1 := metricsGinHandler.Group(apiPathPrefix) httpserver.NewHandlers(s.proxy).RegisterRoutesTo(apiv1) management.Register(&management.Handler{ - Path: "/", + Path: management.RootPath, HandlerFunc: nil, Handler: metricsGinHandler.Handler(), }) diff --git a/internal/http/router.go b/internal/http/router.go index 220ed3d5f7c37..5d3f951b2b75d 100644 --- a/internal/http/router.go +++ b/internal/http/router.go @@ -27,3 +27,34 @@ const EventLogRouterPath = "/eventlog" // ExprPath is path for expression. const ExprPath = "/expr" + +const RootPath = "/" + +// Prometheus restful api path +const ( + MetricsPath = "/metrics" + MetricsDefaultPath = "/metrics_default" +) + +// for every component, register it's own api to trigger stop and check ready +const ( + RouteTriggerStopPath = "/management/stop" + RouteCheckComponentReady = "/management/check/ready" +) + +// proxy management restful api root path +const ( + RouteGcPause = "/management/datacoord/garbage_collection/pause" + RouteGcResume = "/management/datacoord/garbage_collection/resume" + + RouteSuspendQueryCoordBalance = "/management/querycoord/balance/suspend" + RouteResumeQueryCoordBalance = "/management/querycoord/balance/resume" + RouteTransferSegment = "/management/querycoord/transfer/segment" + RouteTransferChannel = "/management/querycoord/transfer/channel" + + RouteSuspendQueryNode = "/management/querycoord/node/suspend" + RouteResumeQueryNode = "/management/querycoord/node/resume" + RouteListQueryNode = "/management/querycoord/node/list" + RouteGetQueryNodeDistribution = "/management/querycoord/distribution/get" + RouteCheckQueryNodeDistribution = "/management/querycoord/distribution/check" +) diff --git a/internal/http/server.go b/internal/http/server.go index 849767b366922..31f03536e73e8 100644 --- a/internal/http/server.go +++ b/internal/http/server.go @@ -88,6 +88,46 @@ func registerDefaults() { }) } +func RegisterStopComponent(triggerComponentStop func(role string) error) { + // register restful api to trigger stop + Register(&Handler{ + Path: RouteTriggerStopPath, + HandlerFunc: func(w http.ResponseWriter, req *http.Request) { + role := req.URL.Query().Get("role") + log.Info("start to trigger component stop", zap.String("role", role)) + if err := triggerComponentStop(role); err != nil { + log.Warn("failed to trigger component stop", zap.Error(err)) + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(fmt.Sprintf(`{"msg": "failed to trigger component stop, %s"}`, err.Error()))) + return + } + log.Info("finish to trigger component stop", zap.String("role", role)) + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"msg": "OK"}`)) + }, + }) +} + +func RegisterCheckComponentReady(checkActive func(role string) error) { + // register restful api to check component ready + Register(&Handler{ + Path: RouteCheckComponentReady, + HandlerFunc: func(w http.ResponseWriter, req *http.Request) { + role := req.URL.Query().Get("role") + log.Info("start to check component ready", zap.String("role", role)) + if err := checkActive(role); err != nil { + log.Warn("failed to check component ready", zap.Error(err)) + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(fmt.Sprintf(`{"msg": "failed to to check component ready, %s"}`, err.Error()))) + return + } + log.Info("finish to check component ready", zap.String("role", role)) + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"msg": "OK"}`)) + }, + }) +} + func Register(h *Handler) { if metricsServer == nil { if paramtable.Get().HTTPCfg.EnablePprof.GetAsBool() { diff --git a/internal/proxy/management.go b/internal/proxy/management.go index 1abdc892037b5..9b31d487fabf6 100644 --- a/internal/proxy/management.go +++ b/internal/proxy/management.go @@ -32,69 +32,52 @@ import ( ) // this file contains proxy management restful API handler - -const ( - mgrRouteGcPause = `/management/datacoord/garbage_collection/pause` - mgrRouteGcResume = `/management/datacoord/garbage_collection/resume` - - mgrSuspendQueryCoordBalance = `/management/querycoord/balance/suspend` - mgrResumeQueryCoordBalance = `/management/querycoord/balance/resume` - mgrTransferSegment = `/management/querycoord/transfer/segment` - mgrTransferChannel = `/management/querycoord/transfer/channel` - - mgrSuspendQueryNode = `/management/querycoord/node/suspend` - mgrResumeQueryNode = `/management/querycoord/node/resume` - mgrListQueryNode = `/management/querycoord/node/list` - mgrGetQueryNodeDistribution = `/management/querycoord/distribution/get` - mgrCheckQueryNodeDistribution = `/management/querycoord/distribution/check` -) - var mgrRouteRegisterOnce sync.Once func RegisterMgrRoute(proxy *Proxy) { mgrRouteRegisterOnce.Do(func() { management.Register(&management.Handler{ - Path: mgrRouteGcPause, + Path: management.RouteGcPause, HandlerFunc: proxy.PauseDatacoordGC, }) management.Register(&management.Handler{ - Path: mgrRouteGcResume, + Path: management.RouteGcResume, HandlerFunc: proxy.ResumeDatacoordGC, }) management.Register(&management.Handler{ - Path: mgrListQueryNode, + Path: management.RouteListQueryNode, HandlerFunc: proxy.ListQueryNode, }) management.Register(&management.Handler{ - Path: mgrGetQueryNodeDistribution, + Path: management.RouteGetQueryNodeDistribution, HandlerFunc: proxy.GetQueryNodeDistribution, }) management.Register(&management.Handler{ - Path: mgrSuspendQueryCoordBalance, + Path: management.RouteSuspendQueryCoordBalance, HandlerFunc: proxy.SuspendQueryCoordBalance, }) management.Register(&management.Handler{ - Path: mgrResumeQueryCoordBalance, + Path: management.RouteResumeQueryCoordBalance, HandlerFunc: proxy.ResumeQueryCoordBalance, }) management.Register(&management.Handler{ - Path: mgrSuspendQueryNode, + Path: management.RouteSuspendQueryNode, HandlerFunc: proxy.SuspendQueryNode, }) management.Register(&management.Handler{ - Path: mgrResumeQueryNode, + Path: management.RouteResumeQueryNode, HandlerFunc: proxy.ResumeQueryNode, }) management.Register(&management.Handler{ - Path: mgrTransferSegment, + Path: management.RouteTransferSegment, HandlerFunc: proxy.TransferSegment, }) management.Register(&management.Handler{ - Path: mgrTransferChannel, + Path: management.RouteTransferChannel, HandlerFunc: proxy.TransferChannel, }) management.Register(&management.Handler{ - Path: mgrCheckQueryNodeDistribution, + Path: management.RouteCheckQueryNodeDistribution, HandlerFunc: proxy.CheckQueryNodeDistribution, }) }) diff --git a/internal/proxy/management_test.go b/internal/proxy/management_test.go index 930fbd01047bb..ed652c5383cce 100644 --- a/internal/proxy/management_test.go +++ b/internal/proxy/management_test.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + management "github.com/milvus-io/milvus/internal/http" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" @@ -66,7 +67,7 @@ func (s *ProxyManagementSuite) TestPauseDataCoordGC() { return &commonpb.Status{}, nil }) - req, err := http.NewRequest(http.MethodGet, mgrRouteGcPause+"?pause_seconds=60", nil) + req, err := http.NewRequest(http.MethodGet, management.RouteGcPause+"?pause_seconds=60", nil) s.Require().NoError(err) recorder := httptest.NewRecorder() @@ -82,7 +83,7 @@ func (s *ProxyManagementSuite) TestPauseDataCoordGC() { return &commonpb.Status{}, errors.New("mock") }) - req, err := http.NewRequest(http.MethodGet, mgrRouteGcPause+"?pause_seconds=60", nil) + req, err := http.NewRequest(http.MethodGet, management.RouteGcPause+"?pause_seconds=60", nil) s.Require().NoError(err) recorder := httptest.NewRecorder() @@ -101,7 +102,7 @@ func (s *ProxyManagementSuite) TestPauseDataCoordGC() { }, nil }) - req, err := http.NewRequest(http.MethodGet, mgrRouteGcPause+"?pause_seconds=60", nil) + req, err := http.NewRequest(http.MethodGet, management.RouteGcPause+"?pause_seconds=60", nil) s.Require().NoError(err) recorder := httptest.NewRecorder() @@ -120,7 +121,7 @@ func (s *ProxyManagementSuite) TestResumeDatacoordGC() { return &commonpb.Status{}, nil }) - req, err := http.NewRequest(http.MethodGet, mgrRouteGcResume, nil) + req, err := http.NewRequest(http.MethodGet, management.RouteGcResume, nil) s.Require().NoError(err) recorder := httptest.NewRecorder() @@ -136,7 +137,7 @@ func (s *ProxyManagementSuite) TestResumeDatacoordGC() { return &commonpb.Status{}, errors.New("mock") }) - req, err := http.NewRequest(http.MethodGet, mgrRouteGcResume, nil) + req, err := http.NewRequest(http.MethodGet, management.RouteGcResume, nil) s.Require().NoError(err) recorder := httptest.NewRecorder() @@ -155,7 +156,7 @@ func (s *ProxyManagementSuite) TestResumeDatacoordGC() { }, nil }) - req, err := http.NewRequest(http.MethodGet, mgrRouteGcResume, nil) + req, err := http.NewRequest(http.MethodGet, management.RouteGcResume, nil) s.Require().NoError(err) recorder := httptest.NewRecorder() @@ -181,7 +182,7 @@ func (s *ProxyManagementSuite) TestListQueryNode() { }, }, nil) - req, err := http.NewRequest(http.MethodPost, mgrListQueryNode, nil) + req, err := http.NewRequest(http.MethodPost, management.RouteListQueryNode, nil) s.Require().NoError(err) recorder := httptest.NewRecorder() @@ -195,7 +196,7 @@ func (s *ProxyManagementSuite) TestListQueryNode() { defer s.TearDownTest() s.querycoord.EXPECT().ListQueryNode(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error")) - req, err := http.NewRequest(http.MethodPost, mgrListQueryNode, nil) + req, err := http.NewRequest(http.MethodPost, management.RouteListQueryNode, nil) s.Require().NoError(err) recorder := httptest.NewRecorder() @@ -211,7 +212,7 @@ func (s *ProxyManagementSuite) TestListQueryNode() { Status: merr.Status(merr.ErrServiceNotReady), }, nil) - req, err := http.NewRequest(http.MethodPost, mgrListQueryNode, nil) + req, err := http.NewRequest(http.MethodPost, management.RouteListQueryNode, nil) s.Require().NoError(err) recorder := httptest.NewRecorder() @@ -232,7 +233,7 @@ func (s *ProxyManagementSuite) TestGetQueryNodeDistribution() { SealedSegmentIDs: []int64{1, 2, 3}, }, nil) - req, err := http.NewRequest(http.MethodPost, mgrGetQueryNodeDistribution, strings.NewReader("node_id=1")) + req, err := http.NewRequest(http.MethodPost, management.RouteGetQueryNodeDistribution, strings.NewReader("node_id=1")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") @@ -247,14 +248,14 @@ func (s *ProxyManagementSuite) TestGetQueryNodeDistribution() { defer s.TearDownTest() // test invalid request body - req, err := http.NewRequest(http.MethodPost, mgrGetQueryNodeDistribution, nil) + req, err := http.NewRequest(http.MethodPost, management.RouteGetQueryNodeDistribution, nil) s.Require().NoError(err) recorder := httptest.NewRecorder() s.proxy.GetQueryNodeDistribution(recorder, req) s.Equal(http.StatusBadRequest, recorder.Code) // test miss requested param - req, err = http.NewRequest(http.MethodPost, mgrGetQueryNodeDistribution, strings.NewReader("")) + req, err = http.NewRequest(http.MethodPost, management.RouteGetQueryNodeDistribution, strings.NewReader("")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder = httptest.NewRecorder() @@ -263,7 +264,7 @@ func (s *ProxyManagementSuite) TestGetQueryNodeDistribution() { // test rpc return error s.querycoord.EXPECT().GetQueryNodeDistribution(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error")) - req, err = http.NewRequest(http.MethodPost, mgrGetQueryNodeDistribution, strings.NewReader("node_id=1")) + req, err = http.NewRequest(http.MethodPost, management.RouteGetQueryNodeDistribution, strings.NewReader("node_id=1")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder = httptest.NewRecorder() @@ -276,7 +277,7 @@ func (s *ProxyManagementSuite) TestGetQueryNodeDistribution() { defer s.TearDownTest() s.querycoord.EXPECT().GetQueryNodeDistribution(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error")) - req, err := http.NewRequest(http.MethodPost, mgrGetQueryNodeDistribution, strings.NewReader("node_id=1")) + req, err := http.NewRequest(http.MethodPost, management.RouteGetQueryNodeDistribution, strings.NewReader("node_id=1")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder := httptest.NewRecorder() @@ -292,7 +293,7 @@ func (s *ProxyManagementSuite) TestSuspendQueryCoordBalance() { s.querycoord.EXPECT().SuspendBalance(mock.Anything, mock.Anything).Return(merr.Success(), nil) - req, err := http.NewRequest(http.MethodPost, mgrSuspendQueryCoordBalance, nil) + req, err := http.NewRequest(http.MethodPost, management.RouteSuspendQueryCoordBalance, nil) s.Require().NoError(err) recorder := httptest.NewRecorder() @@ -306,7 +307,7 @@ func (s *ProxyManagementSuite) TestSuspendQueryCoordBalance() { defer s.TearDownTest() s.querycoord.EXPECT().SuspendBalance(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error")) - req, err := http.NewRequest(http.MethodPost, mgrSuspendQueryCoordBalance, nil) + req, err := http.NewRequest(http.MethodPost, management.RouteSuspendQueryCoordBalance, nil) s.Require().NoError(err) recorder := httptest.NewRecorder() @@ -319,7 +320,7 @@ func (s *ProxyManagementSuite) TestSuspendQueryCoordBalance() { defer s.TearDownTest() s.querycoord.EXPECT().SuspendBalance(mock.Anything, mock.Anything).Return(merr.Status(merr.ErrServiceNotReady), nil) - req, err := http.NewRequest(http.MethodPost, mgrSuspendQueryCoordBalance, nil) + req, err := http.NewRequest(http.MethodPost, management.RouteSuspendQueryCoordBalance, nil) s.Require().NoError(err) recorder := httptest.NewRecorder() @@ -335,7 +336,7 @@ func (s *ProxyManagementSuite) TestResumeQueryCoordBalance() { s.querycoord.EXPECT().ResumeBalance(mock.Anything, mock.Anything).Return(merr.Success(), nil) - req, err := http.NewRequest(http.MethodPost, mgrResumeQueryCoordBalance, nil) + req, err := http.NewRequest(http.MethodPost, management.RouteResumeQueryCoordBalance, nil) s.Require().NoError(err) recorder := httptest.NewRecorder() @@ -349,7 +350,7 @@ func (s *ProxyManagementSuite) TestResumeQueryCoordBalance() { defer s.TearDownTest() s.querycoord.EXPECT().ResumeBalance(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error")) - req, err := http.NewRequest(http.MethodPost, mgrResumeQueryCoordBalance, nil) + req, err := http.NewRequest(http.MethodPost, management.RouteResumeQueryCoordBalance, nil) s.Require().NoError(err) recorder := httptest.NewRecorder() @@ -362,7 +363,7 @@ func (s *ProxyManagementSuite) TestResumeQueryCoordBalance() { defer s.TearDownTest() s.querycoord.EXPECT().ResumeBalance(mock.Anything, mock.Anything).Return(merr.Status(merr.ErrServiceNotReady), nil) - req, err := http.NewRequest(http.MethodPost, mgrResumeQueryCoordBalance, nil) + req, err := http.NewRequest(http.MethodPost, management.RouteResumeQueryCoordBalance, nil) s.Require().NoError(err) recorder := httptest.NewRecorder() @@ -378,7 +379,7 @@ func (s *ProxyManagementSuite) TestSuspendQueryNode() { s.querycoord.EXPECT().SuspendNode(mock.Anything, mock.Anything).Return(merr.Success(), nil) - req, err := http.NewRequest(http.MethodPost, mgrSuspendQueryNode, strings.NewReader("node_id=1")) + req, err := http.NewRequest(http.MethodPost, management.RouteSuspendQueryNode, strings.NewReader("node_id=1")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") @@ -393,14 +394,14 @@ func (s *ProxyManagementSuite) TestSuspendQueryNode() { defer s.TearDownTest() // test invalid request body - req, err := http.NewRequest(http.MethodPost, mgrSuspendQueryNode, nil) + req, err := http.NewRequest(http.MethodPost, management.RouteSuspendQueryNode, nil) s.Require().NoError(err) recorder := httptest.NewRecorder() s.proxy.SuspendQueryNode(recorder, req) s.Equal(http.StatusBadRequest, recorder.Code) // test miss requested param - req, err = http.NewRequest(http.MethodPost, mgrSuspendQueryNode, strings.NewReader("")) + req, err = http.NewRequest(http.MethodPost, management.RouteSuspendQueryNode, strings.NewReader("")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder = httptest.NewRecorder() @@ -409,7 +410,7 @@ func (s *ProxyManagementSuite) TestSuspendQueryNode() { // test rpc return error s.querycoord.EXPECT().SuspendNode(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error")) - req, err = http.NewRequest(http.MethodPost, mgrSuspendQueryNode, strings.NewReader("node_id=1")) + req, err = http.NewRequest(http.MethodPost, management.RouteSuspendQueryNode, strings.NewReader("node_id=1")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder = httptest.NewRecorder() @@ -422,7 +423,7 @@ func (s *ProxyManagementSuite) TestSuspendQueryNode() { defer s.TearDownTest() s.querycoord.EXPECT().SuspendNode(mock.Anything, mock.Anything).Return(merr.Status(merr.ErrServiceNotReady), nil) - req, err := http.NewRequest(http.MethodPost, mgrSuspendQueryNode, strings.NewReader("node_id=1")) + req, err := http.NewRequest(http.MethodPost, management.RouteSuspendQueryNode, strings.NewReader("node_id=1")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder := httptest.NewRecorder() @@ -438,7 +439,7 @@ func (s *ProxyManagementSuite) TestResumeQueryNode() { s.querycoord.EXPECT().ResumeNode(mock.Anything, mock.Anything).Return(merr.Success(), nil) - req, err := http.NewRequest(http.MethodPost, mgrResumeQueryNode, strings.NewReader("node_id=1")) + req, err := http.NewRequest(http.MethodPost, management.RouteResumeQueryNode, strings.NewReader("node_id=1")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") @@ -453,14 +454,14 @@ func (s *ProxyManagementSuite) TestResumeQueryNode() { defer s.TearDownTest() // test invalid request body - req, err := http.NewRequest(http.MethodPost, mgrResumeQueryNode, nil) + req, err := http.NewRequest(http.MethodPost, management.RouteResumeQueryNode, nil) s.Require().NoError(err) recorder := httptest.NewRecorder() s.proxy.ResumeQueryNode(recorder, req) s.Equal(http.StatusBadRequest, recorder.Code) // test miss requested param - req, err = http.NewRequest(http.MethodPost, mgrResumeQueryNode, strings.NewReader("")) + req, err = http.NewRequest(http.MethodPost, management.RouteResumeQueryNode, strings.NewReader("")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder = httptest.NewRecorder() @@ -469,7 +470,7 @@ func (s *ProxyManagementSuite) TestResumeQueryNode() { // test rpc return error s.querycoord.EXPECT().ResumeNode(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error")) - req, err = http.NewRequest(http.MethodPost, mgrResumeQueryNode, strings.NewReader("node_id=1")) + req, err = http.NewRequest(http.MethodPost, management.RouteResumeQueryNode, strings.NewReader("node_id=1")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder = httptest.NewRecorder() @@ -482,7 +483,7 @@ func (s *ProxyManagementSuite) TestResumeQueryNode() { defer s.TearDownTest() s.querycoord.EXPECT().ResumeNode(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error")) - req, err := http.NewRequest(http.MethodPost, mgrResumeQueryNode, strings.NewReader("node_id=1")) + req, err := http.NewRequest(http.MethodPost, management.RouteResumeQueryNode, strings.NewReader("node_id=1")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder := httptest.NewRecorder() @@ -498,7 +499,7 @@ func (s *ProxyManagementSuite) TestTransferSegment() { s.querycoord.EXPECT().TransferSegment(mock.Anything, mock.Anything).Return(merr.Success(), nil) - req, err := http.NewRequest(http.MethodPost, mgrTransferSegment, strings.NewReader("source_node_id=1&target_node_id=1&segment_id=1©_mode=false")) + req, err := http.NewRequest(http.MethodPost, management.RouteTransferSegment, strings.NewReader("source_node_id=1&target_node_id=1&segment_id=1©_mode=false")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder := httptest.NewRecorder() @@ -507,7 +508,7 @@ func (s *ProxyManagementSuite) TestTransferSegment() { s.Equal(`{"msg": "OK"}`, recorder.Body.String()) // test use default param - req, err = http.NewRequest(http.MethodPost, mgrTransferSegment, strings.NewReader("source_node_id=1")) + req, err = http.NewRequest(http.MethodPost, management.RouteTransferSegment, strings.NewReader("source_node_id=1")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder = httptest.NewRecorder() @@ -521,14 +522,14 @@ func (s *ProxyManagementSuite) TestTransferSegment() { defer s.TearDownTest() // test invalid request body - req, err := http.NewRequest(http.MethodPost, mgrTransferSegment, nil) + req, err := http.NewRequest(http.MethodPost, management.RouteTransferSegment, nil) s.Require().NoError(err) recorder := httptest.NewRecorder() s.proxy.TransferSegment(recorder, req) s.Equal(http.StatusBadRequest, recorder.Code) // test miss requested param - req, err = http.NewRequest(http.MethodPost, mgrTransferSegment, strings.NewReader("")) + req, err = http.NewRequest(http.MethodPost, management.RouteTransferSegment, strings.NewReader("")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder = httptest.NewRecorder() @@ -537,7 +538,7 @@ func (s *ProxyManagementSuite) TestTransferSegment() { // test rpc return error s.querycoord.EXPECT().TransferSegment(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error")) - req, err = http.NewRequest(http.MethodPost, mgrTransferSegment, strings.NewReader("source_node_id=1&target_node_id=1&segment_id=1")) + req, err = http.NewRequest(http.MethodPost, management.RouteTransferSegment, strings.NewReader("source_node_id=1&target_node_id=1&segment_id=1")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder = httptest.NewRecorder() @@ -550,7 +551,7 @@ func (s *ProxyManagementSuite) TestTransferSegment() { defer s.TearDownTest() s.querycoord.EXPECT().TransferSegment(mock.Anything, mock.Anything).Return(merr.Status(merr.ErrServiceNotReady), nil) - req, err := http.NewRequest(http.MethodPost, mgrTransferSegment, strings.NewReader("source_node_id=1")) + req, err := http.NewRequest(http.MethodPost, management.RouteTransferSegment, strings.NewReader("source_node_id=1")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder := httptest.NewRecorder() @@ -566,7 +567,7 @@ func (s *ProxyManagementSuite) TestTransferChannel() { s.querycoord.EXPECT().TransferChannel(mock.Anything, mock.Anything).Return(merr.Success(), nil) - req, err := http.NewRequest(http.MethodPost, mgrTransferChannel, strings.NewReader("source_node_id=1&target_node_id=1&segment_id=1©_mode=false")) + req, err := http.NewRequest(http.MethodPost, management.RouteTransferChannel, strings.NewReader("source_node_id=1&target_node_id=1&segment_id=1©_mode=false")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder := httptest.NewRecorder() @@ -575,7 +576,7 @@ func (s *ProxyManagementSuite) TestTransferChannel() { s.Equal(`{"msg": "OK"}`, recorder.Body.String()) // test use default param - req, err = http.NewRequest(http.MethodPost, mgrTransferChannel, strings.NewReader("source_node_id=1")) + req, err = http.NewRequest(http.MethodPost, management.RouteTransferChannel, strings.NewReader("source_node_id=1")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder = httptest.NewRecorder() @@ -589,14 +590,14 @@ func (s *ProxyManagementSuite) TestTransferChannel() { defer s.TearDownTest() // test invalid request body - req, err := http.NewRequest(http.MethodPost, mgrTransferChannel, nil) + req, err := http.NewRequest(http.MethodPost, management.RouteTransferChannel, nil) s.Require().NoError(err) recorder := httptest.NewRecorder() s.proxy.TransferChannel(recorder, req) s.Equal(http.StatusBadRequest, recorder.Code) // test miss requested param - req, err = http.NewRequest(http.MethodPost, mgrTransferChannel, strings.NewReader("")) + req, err = http.NewRequest(http.MethodPost, management.RouteTransferChannel, strings.NewReader("")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder = httptest.NewRecorder() @@ -605,7 +606,7 @@ func (s *ProxyManagementSuite) TestTransferChannel() { // test rpc return error s.querycoord.EXPECT().TransferChannel(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error")) - req, err = http.NewRequest(http.MethodPost, mgrTransferChannel, strings.NewReader("source_node_id=1&target_node_id=1&segment_id=1")) + req, err = http.NewRequest(http.MethodPost, management.RouteTransferChannel, strings.NewReader("source_node_id=1&target_node_id=1&segment_id=1")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder = httptest.NewRecorder() @@ -618,7 +619,7 @@ func (s *ProxyManagementSuite) TestTransferChannel() { defer s.TearDownTest() s.querycoord.EXPECT().TransferChannel(mock.Anything, mock.Anything).Return(merr.Status(merr.ErrServiceNotReady), nil) - req, err := http.NewRequest(http.MethodPost, mgrTransferChannel, strings.NewReader("source_node_id=1")) + req, err := http.NewRequest(http.MethodPost, management.RouteTransferChannel, strings.NewReader("source_node_id=1")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder := httptest.NewRecorder() @@ -634,7 +635,7 @@ func (s *ProxyManagementSuite) TestCheckQueryNodeDistribution() { s.querycoord.EXPECT().CheckQueryNodeDistribution(mock.Anything, mock.Anything).Return(merr.Success(), nil) - req, err := http.NewRequest(http.MethodPost, mgrCheckQueryNodeDistribution, strings.NewReader("source_node_id=1&target_node_id=1")) + req, err := http.NewRequest(http.MethodPost, management.RouteCheckQueryNodeDistribution, strings.NewReader("source_node_id=1&target_node_id=1")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder := httptest.NewRecorder() @@ -648,14 +649,14 @@ func (s *ProxyManagementSuite) TestCheckQueryNodeDistribution() { defer s.TearDownTest() // test invalid request body - req, err := http.NewRequest(http.MethodPost, mgrCheckQueryNodeDistribution, nil) + req, err := http.NewRequest(http.MethodPost, management.RouteCheckQueryNodeDistribution, nil) s.Require().NoError(err) recorder := httptest.NewRecorder() s.proxy.CheckQueryNodeDistribution(recorder, req) s.Equal(http.StatusBadRequest, recorder.Code) // test miss requested param - req, err = http.NewRequest(http.MethodPost, mgrCheckQueryNodeDistribution, strings.NewReader("")) + req, err = http.NewRequest(http.MethodPost, management.RouteCheckQueryNodeDistribution, strings.NewReader("")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder = httptest.NewRecorder() @@ -664,7 +665,7 @@ func (s *ProxyManagementSuite) TestCheckQueryNodeDistribution() { // test rpc return error s.querycoord.EXPECT().CheckQueryNodeDistribution(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error")) - req, err = http.NewRequest(http.MethodPost, mgrCheckQueryNodeDistribution, strings.NewReader("source_node_id=1&target_node_id=1")) + req, err = http.NewRequest(http.MethodPost, management.RouteCheckQueryNodeDistribution, strings.NewReader("source_node_id=1&target_node_id=1")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder = httptest.NewRecorder() @@ -677,7 +678,7 @@ func (s *ProxyManagementSuite) TestCheckQueryNodeDistribution() { defer s.TearDownTest() s.querycoord.EXPECT().CheckQueryNodeDistribution(mock.Anything, mock.Anything).Return(merr.Status(merr.ErrServiceNotReady), nil) - req, err := http.NewRequest(http.MethodPost, mgrCheckQueryNodeDistribution, strings.NewReader("source_node_id=1&target_node_id=1")) + req, err := http.NewRequest(http.MethodPost, management.RouteCheckQueryNodeDistribution, strings.NewReader("source_node_id=1&target_node_id=1")) s.Require().NoError(err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") recorder := httptest.NewRecorder()