From 1491c92831f8a729623d18e22e6252111b6c8ad7 Mon Sep 17 00:00:00 2001 From: maybaby Date: Wed, 20 Jul 2022 16:54:56 +0800 Subject: [PATCH] fix: add python stdio implement --- client.go | 2 + examples/grpc/README.md | 2 +- examples/grpc/main.go | 1 + examples/grpc/plugin-python/client.py | 22 ++ examples/grpc/plugin-python/grpc_stdio_pb2.py | 42 +++ .../grpc/plugin-python/grpc_stdio_pb2_grpc.py | 81 +++++ examples/grpc/plugin-python/kv_pb2.py | 326 ++---------------- examples/grpc/plugin-python/kv_pb2_grpc.py | 120 +++++-- examples/grpc/plugin-python/plugin.py | 75 +++- examples/grpc/proto/grpc_stdio.proto | 30 ++ grpc_stdio.go | 2 +- 11 files changed, 366 insertions(+), 337 deletions(-) create mode 100644 examples/grpc/plugin-python/client.py create mode 100644 examples/grpc/plugin-python/grpc_stdio_pb2.py create mode 100644 examples/grpc/plugin-python/grpc_stdio_pb2_grpc.py create mode 100644 examples/grpc/proto/grpc_stdio.proto diff --git a/client.go b/client.go index e0bee88a..b8bc657f 100644 --- a/client.go +++ b/client.go @@ -369,6 +369,8 @@ func (c *Client) Client() (ClientProtocol, error) { c.client, err = newRPCClient(c) case ProtocolGRPC: + //c.client, err = newRPCClient(c) + // c.client, err = newGRPCClient(c.doneCtx, c) default: diff --git a/examples/grpc/README.md b/examples/grpc/README.md index 7b3b647a..a06342b5 100644 --- a/examples/grpc/README.md +++ b/examples/grpc/README.md @@ -67,5 +67,5 @@ $ protoc -I proto/ proto/kv.proto --go_out=plugins=grpc:proto/ For Python: ```sh -$ python -m grpc_tools.protoc -I ./proto/ --python_out=./plugin-python/ --grpc_python_out=./plugin-python/ ./proto/kv.proto +$ python -m grpc_tools.protoc -I ./proto/ --python_out=./plugin-python/ --grpc_python_out=./plugin-python/ ./proto/kv.proto ./proto/grpc_stdio.proto ``` diff --git a/examples/grpc/main.go b/examples/grpc/main.go index cc91d71f..75faefe0 100644 --- a/examples/grpc/main.go +++ b/examples/grpc/main.go @@ -19,6 +19,7 @@ func main() { client := plugin.NewClient(&plugin.ClientConfig{ HandshakeConfig: shared.Handshake, Plugins: shared.PluginMap, + SyncStdout: os.Stdout, Cmd: exec.Command("sh", "-c", os.Getenv("KV_PLUGIN")), AllowedProtocols: []plugin.Protocol{ plugin.ProtocolNetRPC, plugin.ProtocolGRPC}, diff --git a/examples/grpc/plugin-python/client.py b/examples/grpc/plugin-python/client.py new file mode 100644 index 00000000..e2202b19 --- /dev/null +++ b/examples/grpc/plugin-python/client.py @@ -0,0 +1,22 @@ + +import kv_pb2 +import kv_pb2_grpc + + +import grpc + + + +def run(): + # NOTE(gRPC Python Team): .close() is possible on a channel and should be + # used in circumstances in which the with statement does not fit the needs + # of the code. + with grpc.insecure_channel('localhost:1234') as channel: + stub = kv_pb2_grpc.KVStub(channel) + stub.Put(kv_pb2.PutRequest(key="cc",value=b"1")) + response = stub.Get(kv_pb2.GetRequest(key="cc")) + + print("Greeter client received: " + str(response.value)) + +if __name__ == '__main__': + run() diff --git a/examples/grpc/plugin-python/grpc_stdio_pb2.py b/examples/grpc/plugin-python/grpc_stdio_pb2.py new file mode 100644 index 00000000..00c08705 --- /dev/null +++ b/examples/grpc/plugin-python/grpc_stdio_pb2.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: grpc_stdio.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10grpc_stdio.proto\x12\x06plugin\x1a\x1bgoogle/protobuf/empty.proto\"u\n\tStdioData\x12*\n\x07\x63hannel\x18\x01 \x01(\x0e\x32\x19.plugin.StdioData.Channel\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\".\n\x07\x43hannel\x12\x0b\n\x07INVALID\x10\x00\x12\n\n\x06STDOUT\x10\x01\x12\n\n\x06STDERR\x10\x02\x32G\n\tGRPCStdio\x12:\n\x0bStreamStdio\x12\x16.google.protobuf.Empty\x1a\x11.plugin.StdioData0\x01\x42\x08Z\x06pluginb\x06proto3') + + + +_STDIODATA = DESCRIPTOR.message_types_by_name['StdioData'] +_STDIODATA_CHANNEL = _STDIODATA.enum_types_by_name['Channel'] +StdioData = _reflection.GeneratedProtocolMessageType('StdioData', (_message.Message,), { + 'DESCRIPTOR' : _STDIODATA, + '__module__' : 'grpc_stdio_pb2' + # @@protoc_insertion_point(class_scope:plugin.StdioData) + }) +_sym_db.RegisterMessage(StdioData) + +_GRPCSTDIO = DESCRIPTOR.services_by_name['GRPCStdio'] +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + DESCRIPTOR._serialized_options = b'Z\006plugin' + _STDIODATA._serialized_start=57 + _STDIODATA._serialized_end=174 + _STDIODATA_CHANNEL._serialized_start=128 + _STDIODATA_CHANNEL._serialized_end=174 + _GRPCSTDIO._serialized_start=176 + _GRPCSTDIO._serialized_end=247 +# @@protoc_insertion_point(module_scope) diff --git a/examples/grpc/plugin-python/grpc_stdio_pb2_grpc.py b/examples/grpc/plugin-python/grpc_stdio_pb2_grpc.py new file mode 100644 index 00000000..9d8d4357 --- /dev/null +++ b/examples/grpc/plugin-python/grpc_stdio_pb2_grpc.py @@ -0,0 +1,81 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 +import grpc_stdio_pb2 as grpc__stdio__pb2 + + +class GRPCStdioStub(object): + """GRPCStdio is a service that is automatically run by the plugin process + to stream any stdout/err data so that it can be mirrored on the plugin + host side. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.StreamStdio = channel.unary_stream( + '/plugin.GRPCStdio/StreamStdio', + request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + response_deserializer=grpc__stdio__pb2.StdioData.FromString, + ) + + +class GRPCStdioServicer(object): + """GRPCStdio is a service that is automatically run by the plugin process + to stream any stdout/err data so that it can be mirrored on the plugin + host side. + """ + + def StreamStdio(self, request, context): + """StreamStdio returns a stream that contains all the stdout/stderr. + This RPC endpoint must only be called ONCE. Once stdio data is consumed + it is not sent again. + + Callers should connect early to prevent blocking on the plugin process. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_GRPCStdioServicer_to_server(servicer, server): + rpc_method_handlers = { + 'StreamStdio': grpc.unary_stream_rpc_method_handler( + servicer.StreamStdio, + request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + response_serializer=grpc__stdio__pb2.StdioData.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'plugin.GRPCStdio', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class GRPCStdio(object): + """GRPCStdio is a service that is automatically run by the plugin process + to stream any stdout/err data so that it can be mirrored on the plugin + host side. + """ + + @staticmethod + def StreamStdio(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_stream(request, target, '/plugin.GRPCStdio/StreamStdio', + google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + grpc__stdio__pb2.StdioData.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/examples/grpc/plugin-python/kv_pb2.py b/examples/grpc/plugin-python/kv_pb2.py index bee04c0b..b041a1b8 100644 --- a/examples/grpc/plugin-python/kv_pb2.py +++ b/examples/grpc/plugin-python/kv_pb2.py @@ -1,13 +1,12 @@ +# -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: kv.proto - -import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +"""Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -15,303 +14,54 @@ -DESCRIPTOR = _descriptor.FileDescriptor( - name='kv.proto', - package='proto', - syntax='proto3', - serialized_pb=_b('\n\x08kv.proto\x12\x05proto\"\x19\n\nGetRequest\x12\x0b\n\x03key\x18\x01 \x01(\t\"\x1c\n\x0bGetResponse\x12\r\n\x05value\x18\x01 \x01(\x0c\"(\n\nPutRequest\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\"\x07\n\x05\x45mpty2Z\n\x02KV\x12,\n\x03Get\x12\x11.proto.GetRequest\x1a\x12.proto.GetResponse\x12&\n\x03Put\x12\x11.proto.PutRequest\x1a\x0c.proto.Emptyb\x06proto3') -) -_sym_db.RegisterFileDescriptor(DESCRIPTOR) - - - - -_GETREQUEST = _descriptor.Descriptor( - name='GetRequest', - full_name='proto.GetRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='key', full_name='proto.GetRequest.key', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=19, - serialized_end=44, -) - - -_GETRESPONSE = _descriptor.Descriptor( - name='GetResponse', - full_name='proto.GetResponse', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='value', full_name='proto.GetResponse.value', index=0, - number=1, type=12, cpp_type=9, label=1, - has_default_value=False, default_value=_b(""), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=46, - serialized_end=74, -) - - -_PUTREQUEST = _descriptor.Descriptor( - name='PutRequest', - full_name='proto.PutRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='key', full_name='proto.PutRequest.key', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - _descriptor.FieldDescriptor( - name='value', full_name='proto.PutRequest.value', index=1, - number=2, type=12, cpp_type=9, label=1, - has_default_value=False, default_value=_b(""), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=76, - serialized_end=116, -) +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x08kv.proto\x12\x05proto\"\x19\n\nGetRequest\x12\x0b\n\x03key\x18\x01 \x01(\t\"\x1c\n\x0bGetResponse\x12\r\n\x05value\x18\x01 \x01(\x0c\"(\n\nPutRequest\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\"\x07\n\x05\x45mpty2Z\n\x02KV\x12,\n\x03Get\x12\x11.proto.GetRequest\x1a\x12.proto.GetResponse\x12&\n\x03Put\x12\x11.proto.PutRequest\x1a\x0c.proto.Emptyb\x06proto3') -_EMPTY = _descriptor.Descriptor( - name='Empty', - full_name='proto.Empty', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=118, - serialized_end=125, -) -DESCRIPTOR.message_types_by_name['GetRequest'] = _GETREQUEST -DESCRIPTOR.message_types_by_name['GetResponse'] = _GETRESPONSE -DESCRIPTOR.message_types_by_name['PutRequest'] = _PUTREQUEST -DESCRIPTOR.message_types_by_name['Empty'] = _EMPTY - -GetRequest = _reflection.GeneratedProtocolMessageType('GetRequest', (_message.Message,), dict( - DESCRIPTOR = _GETREQUEST, - __module__ = 'kv_pb2' +_GETREQUEST = DESCRIPTOR.message_types_by_name['GetRequest'] +_GETRESPONSE = DESCRIPTOR.message_types_by_name['GetResponse'] +_PUTREQUEST = DESCRIPTOR.message_types_by_name['PutRequest'] +_EMPTY = DESCRIPTOR.message_types_by_name['Empty'] +GetRequest = _reflection.GeneratedProtocolMessageType('GetRequest', (_message.Message,), { + 'DESCRIPTOR' : _GETREQUEST, + '__module__' : 'kv_pb2' # @@protoc_insertion_point(class_scope:proto.GetRequest) - )) + }) _sym_db.RegisterMessage(GetRequest) -GetResponse = _reflection.GeneratedProtocolMessageType('GetResponse', (_message.Message,), dict( - DESCRIPTOR = _GETRESPONSE, - __module__ = 'kv_pb2' +GetResponse = _reflection.GeneratedProtocolMessageType('GetResponse', (_message.Message,), { + 'DESCRIPTOR' : _GETRESPONSE, + '__module__' : 'kv_pb2' # @@protoc_insertion_point(class_scope:proto.GetResponse) - )) + }) _sym_db.RegisterMessage(GetResponse) -PutRequest = _reflection.GeneratedProtocolMessageType('PutRequest', (_message.Message,), dict( - DESCRIPTOR = _PUTREQUEST, - __module__ = 'kv_pb2' +PutRequest = _reflection.GeneratedProtocolMessageType('PutRequest', (_message.Message,), { + 'DESCRIPTOR' : _PUTREQUEST, + '__module__' : 'kv_pb2' # @@protoc_insertion_point(class_scope:proto.PutRequest) - )) + }) _sym_db.RegisterMessage(PutRequest) -Empty = _reflection.GeneratedProtocolMessageType('Empty', (_message.Message,), dict( - DESCRIPTOR = _EMPTY, - __module__ = 'kv_pb2' +Empty = _reflection.GeneratedProtocolMessageType('Empty', (_message.Message,), { + 'DESCRIPTOR' : _EMPTY, + '__module__' : 'kv_pb2' # @@protoc_insertion_point(class_scope:proto.Empty) - )) + }) _sym_db.RegisterMessage(Empty) - -try: - # THESE ELEMENTS WILL BE DEPRECATED. - # Please use the generated *_pb2_grpc.py files instead. - import grpc - from grpc.beta import implementations as beta_implementations - from grpc.beta import interfaces as beta_interfaces - from grpc.framework.common import cardinality - from grpc.framework.interfaces.face import utilities as face_utilities - - - class KVStub(object): - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ - self.Get = channel.unary_unary( - '/proto.KV/Get', - request_serializer=GetRequest.SerializeToString, - response_deserializer=GetResponse.FromString, - ) - self.Put = channel.unary_unary( - '/proto.KV/Put', - request_serializer=PutRequest.SerializeToString, - response_deserializer=Empty.FromString, - ) - - - class KVServicer(object): - - def Get(self, request, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def Put(self, request, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - - def add_KVServicer_to_server(servicer, server): - rpc_method_handlers = { - 'Get': grpc.unary_unary_rpc_method_handler( - servicer.Get, - request_deserializer=GetRequest.FromString, - response_serializer=GetResponse.SerializeToString, - ), - 'Put': grpc.unary_unary_rpc_method_handler( - servicer.Put, - request_deserializer=PutRequest.FromString, - response_serializer=Empty.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - 'proto.KV', rpc_method_handlers) - server.add_generic_rpc_handlers((generic_handler,)) - - - class BetaKVServicer(object): - """The Beta API is deprecated for 0.15.0 and later. - - It is recommended to use the GA API (classes and functions in this - file not marked beta) for all further purposes. This class was generated - only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0.""" - def Get(self, request, context): - context.code(beta_interfaces.StatusCode.UNIMPLEMENTED) - def Put(self, request, context): - context.code(beta_interfaces.StatusCode.UNIMPLEMENTED) - - - class BetaKVStub(object): - """The Beta API is deprecated for 0.15.0 and later. - - It is recommended to use the GA API (classes and functions in this - file not marked beta) for all further purposes. This class was generated - only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0.""" - def Get(self, request, timeout, metadata=None, with_call=False, protocol_options=None): - raise NotImplementedError() - Get.future = None - def Put(self, request, timeout, metadata=None, with_call=False, protocol_options=None): - raise NotImplementedError() - Put.future = None - - - def beta_create_KV_server(servicer, pool=None, pool_size=None, default_timeout=None, maximum_timeout=None): - """The Beta API is deprecated for 0.15.0 and later. - - It is recommended to use the GA API (classes and functions in this - file not marked beta) for all further purposes. This function was - generated only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0""" - request_deserializers = { - ('proto.KV', 'Get'): GetRequest.FromString, - ('proto.KV', 'Put'): PutRequest.FromString, - } - response_serializers = { - ('proto.KV', 'Get'): GetResponse.SerializeToString, - ('proto.KV', 'Put'): Empty.SerializeToString, - } - method_implementations = { - ('proto.KV', 'Get'): face_utilities.unary_unary_inline(servicer.Get), - ('proto.KV', 'Put'): face_utilities.unary_unary_inline(servicer.Put), - } - server_options = beta_implementations.server_options(request_deserializers=request_deserializers, response_serializers=response_serializers, thread_pool=pool, thread_pool_size=pool_size, default_timeout=default_timeout, maximum_timeout=maximum_timeout) - return beta_implementations.server(method_implementations, options=server_options) - - - def beta_create_KV_stub(channel, host=None, metadata_transformer=None, pool=None, pool_size=None): - """The Beta API is deprecated for 0.15.0 and later. - - It is recommended to use the GA API (classes and functions in this - file not marked beta) for all further purposes. This function was - generated only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0""" - request_serializers = { - ('proto.KV', 'Get'): GetRequest.SerializeToString, - ('proto.KV', 'Put'): PutRequest.SerializeToString, - } - response_deserializers = { - ('proto.KV', 'Get'): GetResponse.FromString, - ('proto.KV', 'Put'): Empty.FromString, - } - cardinalities = { - 'Get': cardinality.Cardinality.UNARY_UNARY, - 'Put': cardinality.Cardinality.UNARY_UNARY, - } - stub_options = beta_implementations.stub_options(host=host, metadata_transformer=metadata_transformer, request_serializers=request_serializers, response_deserializers=response_deserializers, thread_pool=pool, thread_pool_size=pool_size) - return beta_implementations.dynamic_stub(channel, 'proto.KV', cardinalities, options=stub_options) -except ImportError: - pass +_KV = DESCRIPTOR.services_by_name['KV'] +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _GETREQUEST._serialized_start=19 + _GETREQUEST._serialized_end=44 + _GETRESPONSE._serialized_start=46 + _GETRESPONSE._serialized_end=74 + _PUTREQUEST._serialized_start=76 + _PUTREQUEST._serialized_end=116 + _EMPTY._serialized_start=118 + _EMPTY._serialized_end=125 + _KV._serialized_start=127 + _KV._serialized_end=217 # @@protoc_insertion_point(module_scope) diff --git a/examples/grpc/plugin-python/kv_pb2_grpc.py b/examples/grpc/plugin-python/kv_pb2_grpc.py index cc331c85..be690a2e 100644 --- a/examples/grpc/plugin-python/kv_pb2_grpc.py +++ b/examples/grpc/plugin-python/kv_pb2_grpc.py @@ -1,55 +1,99 @@ # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" import grpc import kv_pb2 as kv__pb2 class KVStub(object): + """Missing associated documentation comment in .proto file.""" - def __init__(self, channel): - """Constructor. + def __init__(self, channel): + """Constructor. - Args: - channel: A grpc.Channel. - """ - self.Get = channel.unary_unary( - '/proto.KV/Get', - request_serializer=kv__pb2.GetRequest.SerializeToString, - response_deserializer=kv__pb2.GetResponse.FromString, - ) - self.Put = channel.unary_unary( - '/proto.KV/Put', - request_serializer=kv__pb2.PutRequest.SerializeToString, - response_deserializer=kv__pb2.Empty.FromString, - ) + Args: + channel: A grpc.Channel. + """ + self.Get = channel.unary_unary( + '/proto.KV/Get', + request_serializer=kv__pb2.GetRequest.SerializeToString, + response_deserializer=kv__pb2.GetResponse.FromString, + ) + self.Put = channel.unary_unary( + '/proto.KV/Put', + request_serializer=kv__pb2.PutRequest.SerializeToString, + response_deserializer=kv__pb2.Empty.FromString, + ) class KVServicer(object): + """Missing associated documentation comment in .proto file.""" - def Get(self, request, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') + def Get(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') - def Put(self, request, context): - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') + def Put(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') def add_KVServicer_to_server(servicer, server): - rpc_method_handlers = { - 'Get': grpc.unary_unary_rpc_method_handler( - servicer.Get, - request_deserializer=kv__pb2.GetRequest.FromString, - response_serializer=kv__pb2.GetResponse.SerializeToString, - ), - 'Put': grpc.unary_unary_rpc_method_handler( - servicer.Put, - request_deserializer=kv__pb2.PutRequest.FromString, - response_serializer=kv__pb2.Empty.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - 'proto.KV', rpc_method_handlers) - server.add_generic_rpc_handlers((generic_handler,)) + rpc_method_handlers = { + 'Get': grpc.unary_unary_rpc_method_handler( + servicer.Get, + request_deserializer=kv__pb2.GetRequest.FromString, + response_serializer=kv__pb2.GetResponse.SerializeToString, + ), + 'Put': grpc.unary_unary_rpc_method_handler( + servicer.Put, + request_deserializer=kv__pb2.PutRequest.FromString, + response_serializer=kv__pb2.Empty.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'proto.KV', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class KV(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def Get(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/proto.KV/Get', + kv__pb2.GetRequest.SerializeToString, + kv__pb2.GetResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def Put(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/proto.KV/Put', + kv__pb2.PutRequest.SerializeToString, + kv__pb2.Empty.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/examples/grpc/plugin-python/plugin.py b/examples/grpc/plugin-python/plugin.py index dbdf4f15..88c6c5c1 100644 --- a/examples/grpc/plugin-python/plugin.py +++ b/examples/grpc/plugin-python/plugin.py @@ -1,33 +1,87 @@ -from concurrent import futures +import grpc +import logging import sys import time - -import grpc - +from concurrent import futures +from grpc_health.v1 import health_pb2, health_pb2_grpc +from grpc_health.v1.health import HealthServicer +from io import StringIO +from queue import Queue +import queue +import grpc_stdio_pb2 +import grpc_stdio_pb2_grpc import kv_pb2 import kv_pb2_grpc +from logging.handlers import QueueHandler,QueueListener + + +class Logger: + def __init__(self): + self.stream = StringIO() # + que = Queue(-1) # no limit on size + self.queue_handler = QueueHandler(que) + self.handler = logging.StreamHandler() + self.listener = QueueListener(que, self.handler) + self.log = logging.getLogger('python-plugin') + self.log.setLevel(logging.DEBUG) + self.logFormatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s %(pathname)s:%(lineno)d - %(' + 'message)s') + self.handler.setFormatter(self.logFormatter) + for handler in self.log.handlers: + self.log.removeHandler(handler) + self.log.addHandler(self.queue_handler) + self.listener.start() + + def __del__(self): + self.listener.stop() + + def read(self): + self.handler.flush() + ret = self.logFormatter.format(self.listener.queue.get()) + "\n" + return ret.encode("utf-8") + + +logger = Logger() +log = logger.log -from grpc_health.v1.health import HealthServicer -from grpc_health.v1 import health_pb2, health_pb2_grpc class KVServicer(kv_pb2_grpc.KVServicer): """Implementation of KV service.""" def Get(self, request, context): - filename = "kv_"+request.key + filename = "kv_" + request.key + import time with open(filename, 'r+b') as f: result = kv_pb2.GetResponse() result.value = f.read() - return result + log.info(133) + time.sleep(10) + log.info(3333) + time.sleep(10) + + + return result def Put(self, request, context): - filename = "kv_"+request.key + filename = "kv_" + request.key value = "{0}\n\nWritten from plugin-python".format(request.value) with open(filename, 'w') as f: f.write(value) return kv_pb2.Empty() + +class StdioService(grpc_stdio_pb2_grpc.GRPCStdioServicer): + def __init__(self, log): + self.log = log + + def StreamStdio(self, request, context): + while True: + sd = grpc_stdio_pb2.StdioData(channel=1, data=self.log.read()) + time.sleep(0.2) + yield sd + + def serve(): # We need to build a health service to work with go-plugin health = HealthServicer() @@ -36,6 +90,8 @@ def serve(): # Start the server. server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) kv_pb2_grpc.add_KVServicer_to_server(KVServicer(), server) + + grpc_stdio_pb2_grpc.add_GRPCStdioServicer_to_server(StdioService(logger), server) health_pb2_grpc.add_HealthServicer_to_server(health, server) server.add_insecure_port('127.0.0.1:1234') server.start() @@ -50,5 +106,6 @@ def serve(): except KeyboardInterrupt: server.stop(0) + if __name__ == '__main__': serve() diff --git a/examples/grpc/proto/grpc_stdio.proto b/examples/grpc/proto/grpc_stdio.proto new file mode 100644 index 00000000..ce1a1223 --- /dev/null +++ b/examples/grpc/proto/grpc_stdio.proto @@ -0,0 +1,30 @@ +syntax = "proto3"; +package plugin; +option go_package = "plugin"; + +import "google/protobuf/empty.proto"; + +// GRPCStdio is a service that is automatically run by the plugin process +// to stream any stdout/err data so that it can be mirrored on the plugin +// host side. +service GRPCStdio { + // StreamStdio returns a stream that contains all the stdout/stderr. + // This RPC endpoint must only be called ONCE. Once stdio data is consumed + // it is not sent again. + // + // Callers should connect early to prevent blocking on the plugin process. + rpc StreamStdio(google.protobuf.Empty) returns (stream StdioData); +} + +// StdioData is a single chunk of stdout or stderr data that is streamed +// from GRPCStdio. +message StdioData { + enum Channel { + INVALID = 0; + STDOUT = 1; + STDERR = 2; + } + + Channel channel = 1; + bytes data = 2; +} diff --git a/grpc_stdio.go b/grpc_stdio.go index a5821815..01c981cc 100644 --- a/grpc_stdio.go +++ b/grpc_stdio.go @@ -160,7 +160,7 @@ func (c *grpcStdioClient) Run(stdout, stderr io.Writer) { // Write! In the event of an error we just continue. if c.log.IsTrace() { - c.log.Trace("received data", "channel", data.Channel.String(), "len", len(data.Data)) + c.log.Trace("received plugin data", "channel", data.Channel.String(), "len", len(data.Data)) } if _, err := io.Copy(w, bytes.NewReader(data.Data)); err != nil { c.log.Error("failed to copy all bytes", "err", err)