From dc598fde3ad50c03fcbd9aacedf7a42af85a1d2a Mon Sep 17 00:00:00 2001 From: JmPotato Date: Tue, 15 Oct 2024 11:07:31 +0800 Subject: [PATCH] Introduce QueryRegion stream Signed-off-by: JmPotato --- pkg/configpb/configpb.pb.gw.go | 22 +++++++- proto/pdpb.proto | 25 +++++++++ scripts/proto.lock | 94 ++++++++++++++++++++++++++++++++++ 3 files changed, 139 insertions(+), 2 deletions(-) diff --git a/pkg/configpb/configpb.pb.gw.go b/pkg/configpb/configpb.pb.gw.go index c0385b0e4..9b4675b32 100644 --- a/pkg/configpb/configpb.pb.gw.go +++ b/pkg/configpb/configpb.pb.gw.go @@ -20,6 +20,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -30,6 +31,7 @@ var _ status.Status var _ = runtime.String var _ = utilities.NewDoubleArray var _ = descriptor.ForMessage +var _ = metadata.Join var ( filter_Config_Get_0 = &utilities.DoubleArray{Encoding: map[string]int{}, Base: []int(nil), Check: []int(nil)} @@ -55,7 +57,10 @@ func local_request_Config_Get_0(ctx context.Context, marshaler runtime.Marshaler var protoReq GetRequest var metadata runtime.ServerMetadata - if err := runtime.PopulateQueryParameters(&protoReq, req.URL.Query(), filter_Config_Get_0); err != nil { + if err := req.ParseForm(); err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Config_Get_0); err != nil { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } @@ -122,7 +127,10 @@ func local_request_Config_Delete_0(ctx context.Context, marshaler runtime.Marsha var protoReq DeleteRequest var metadata runtime.ServerMetadata - if err := runtime.PopulateQueryParameters(&protoReq, req.URL.Query(), filter_Config_Delete_0); err != nil { + if err := req.ParseForm(); err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Config_Delete_0); err != nil { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } @@ -134,11 +142,14 @@ func local_request_Config_Delete_0(ctx context.Context, marshaler runtime.Marsha // RegisterConfigHandlerServer registers the http handlers for service Config to "mux". // UnaryRPC :call ConfigServer directly. // StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. +// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterConfigHandlerFromEndpoint instead. func RegisterConfigHandlerServer(ctx context.Context, mux *runtime.ServeMux, server ConfigServer) error { mux.Handle("GET", pattern_Config_Get_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) if err != nil { @@ -146,6 +157,7 @@ func RegisterConfigHandlerServer(ctx context.Context, mux *runtime.ServeMux, ser return } resp, md, err := local_request_Config_Get_0(rctx, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) ctx = runtime.NewServerMetadataContext(ctx, md) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) @@ -159,6 +171,8 @@ func RegisterConfigHandlerServer(ctx context.Context, mux *runtime.ServeMux, ser mux.Handle("POST", pattern_Config_Update_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) if err != nil { @@ -166,6 +180,7 @@ func RegisterConfigHandlerServer(ctx context.Context, mux *runtime.ServeMux, ser return } resp, md, err := local_request_Config_Update_0(rctx, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) ctx = runtime.NewServerMetadataContext(ctx, md) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) @@ -179,6 +194,8 @@ func RegisterConfigHandlerServer(ctx context.Context, mux *runtime.ServeMux, ser mux.Handle("DELETE", pattern_Config_Delete_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) if err != nil { @@ -186,6 +203,7 @@ func RegisterConfigHandlerServer(ctx context.Context, mux *runtime.ServeMux, ser return } resp, md, err := local_request_Config_Delete_0(rctx, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) ctx = runtime.NewServerMetadataContext(ctx, md) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) diff --git a/proto/pdpb.proto b/proto/pdpb.proto index 1bbfcd4ca..8c849400a 100644 --- a/proto/pdpb.proto +++ b/proto/pdpb.proto @@ -51,6 +51,8 @@ service PD { rpc GetRegionByID(GetRegionByIDRequest) returns (GetRegionResponse) {} + rpc QueryRegion(stream QueryRegionRequest) returns (stream QueryRegionResponse) {} + // Deprecated: use BatchScanRegions instead. rpc ScanRegions(ScanRegionsRequest) returns (ScanRegionsResponse) {} @@ -334,6 +336,29 @@ message GetRegionByIDRequest { bool need_buckets = 3; } +message QueryRegionRequest { + RequestHeader header = 1; + + bool need_buckets = 2; + repeated bytes region_keys = 3; + repeated uint64 region_ids = 4; +} + +message QueryRegionResponse { + ResponseHeader header = 1; + + map regions_by_key = 2; + map regions_by_id = 3; +} + +message RegionResponse { + metapb.Region region = 1; + metapb.Peer leader = 2; + repeated PeerStats down_peers = 3; + repeated metapb.Peer pending_peers = 4; + metapb.Buckets buckets = 5; +} + // Use GetRegionResponse as the response of GetRegionByIDRequest. // Deprecated: use BatchScanRegionsRequest instead. message ScanRegionsRequest { diff --git a/scripts/proto.lock b/scripts/proto.lock index cb0ea582f..ce806eafa 100644 --- a/scripts/proto.lock +++ b/scripts/proto.lock @@ -14265,6 +14265,93 @@ } ] }, + { + "name": "QueryRegionRequest", + "fields": [ + { + "id": 1, + "name": "header", + "type": "RequestHeader" + }, + { + "id": 2, + "name": "need_buckets", + "type": "bool" + }, + { + "id": 3, + "name": "region_keys", + "type": "bytes", + "is_repeated": true + }, + { + "id": 4, + "name": "region_ids", + "type": "uint64", + "is_repeated": true + } + ] + }, + { + "name": "QueryRegionResponse", + "fields": [ + { + "id": 1, + "name": "header", + "type": "ResponseHeader" + } + ], + "maps": [ + { + "key_type": "bytes", + "field": { + "id": 2, + "name": "regions_by_key", + "type": "RegionResponse" + } + }, + { + "key_type": "uint64", + "field": { + "id": 3, + "name": "regions_by_id", + "type": "RegionResponse" + } + } + ] + }, + { + "name": "RegionResponse", + "fields": [ + { + "id": 1, + "name": "region", + "type": "metapb.Region" + }, + { + "id": 2, + "name": "leader", + "type": "metapb.Peer" + }, + { + "id": 3, + "name": "down_peers", + "type": "PeerStats", + "is_repeated": true + }, + { + "id": 4, + "name": "pending_peers", + "type": "metapb.Peer", + "is_repeated": true + }, + { + "id": 5, + "name": "buckets", + "type": "metapb.Buckets" + } + ] + }, { "name": "ScanRegionsRequest", "fields": [ @@ -16494,6 +16581,13 @@ "in_type": "GetRegionByIDRequest", "out_type": "GetRegionResponse" }, + { + "name": "QueryRegion", + "in_type": "QueryRegionRequest", + "out_type": "QueryRegionResponse", + "in_streamed": true, + "out_streamed": true + }, { "name": "ScanRegions", "in_type": "ScanRegionsRequest",