From 0d3c2c76e66b5164809aa076924f6fe4922963ea Mon Sep 17 00:00:00 2001 From: Alexander Trost Date: Sun, 22 Sep 2024 13:25:39 +0200 Subject: [PATCH] fix: improve grpcws cancel logic in frontend Signed-off-by: Alexander Trost --- app/composables/grpcws/bridge/index.ts | 127 +++++++++++------- .../transports/websocket/websocketChannel.ts | 9 ++ 2 files changed, 84 insertions(+), 52 deletions(-) diff --git a/app/composables/grpcws/bridge/index.ts b/app/composables/grpcws/bridge/index.ts index 162a0a11a..a9e186092 100644 --- a/app/composables/grpcws/bridge/index.ts +++ b/app/composables/grpcws/bridge/index.ts @@ -16,6 +16,7 @@ import { import type { UseWebSocketReturn } from '@vueuse/core'; import { Metadata } from '~/composables/grpcws/metadata'; import type { GrpcWSOptions } from '../../grpcws/bridge/options'; +import { errCancelled, errInternal } from '../errors'; import type { Transport, TransportFactory } from '../transports/transport'; import { WebsocketChannelTransport } from '../transports/websocket/websocketChannel'; import { createGrpcStatus, createGrpcTrailers } from './utils'; @@ -64,35 +65,7 @@ export class GrpcWSTransport implements RpcTransport { } unary(method: MethodInfo, input: I, options: RpcOptions): UnaryCall { - const opt = options as GrpcWSOptions, - transport = this.wsTs({ - methodDefinition: method, - debug: opt.debug, - url: '', - - onChunk(chunkBytes) { - defHeader.resolvePending({}); - defTrailer.resolvePending({}); - defStatus.resolvePending(createGrpcStatus(new Metadata())); - defMessage.resolve(method.O.fromBinary(chunkBytes, opt.binaryOptions)); - }, - onEnd(err) { - if (err && err instanceof Error) { - defHeader.rejectPending(err); - defMessage.rejectPending(err); - defStatus.rejectPending(err); - defTrailer.rejectPending(err); - defMessage.rejectPending(err); - return; - } - }, - onHeaders(headers: Metadata, _: number): void { - defHeader.resolvePending(headers.headersMap); - - defStatus.resolvePending(createGrpcStatus(headers)); - defTrailer.resolvePending(createGrpcTrailers(headers)); - }, - }); + const opt = options as GrpcWSOptions; const meta = opt.meta ?? {}, defHeader = new Deferred(), @@ -111,11 +84,42 @@ export class GrpcWSTransport implements RpcTransport { const abort = opt.abort || (opt.timeout ? AbortSignal.timeout(opt.timeout) : undefined); if (abort) { - abort.addEventListener('abort', (_) => { - transport.cancel(); - }); + abort.addEventListener('abort', () => transport.cancel()); } + const transport = this.wsTs({ + methodDefinition: method, + debug: opt.debug, + url: '', + + onChunk(chunkBytes) { + defHeader.resolvePending({}); + defTrailer.resolvePending({}); + defStatus.resolvePending(createGrpcStatus(new Metadata())); + defMessage.resolve(method.O.fromBinary(chunkBytes, opt.binaryOptions)); + }, + onEnd(err) { + if (err instanceof Error) { + if (err.name === 'AbortError') { + err = errCancelled; + } + } else { + err = errInternal; + } + + defHeader.rejectPending(err); + defMessage.rejectPending(err); + defStatus.rejectPending(err); + defTrailer.rejectPending(err); + }, + onHeaders(headers: Metadata, _: number): void { + defHeader.resolvePending(headers.headersMap); + + defStatus.resolvePending(createGrpcStatus(headers)); + defTrailer.resolvePending(createGrpcTrailers(headers)); + }, + }); + transport.start(new Metadata()); transport.sendMessage(method.I.toBinary(input, opt.binaryOptions), true); @@ -142,17 +146,24 @@ export class GrpcWSTransport implements RpcTransport { outStream.notifyMessage(method.O.fromBinary(chunkBytes, opt.binaryOptions)); }, onEnd(err) { - if (err && err instanceof Error) { - defHeader.rejectPending(err); - outStream.notifyError(err); - defStatus.rejectPending(err); - defTrailer.rejectPending(err); - return; + if (err instanceof Error) { + if (err.name === 'AbortError') { + err = errCancelled; + } + } else { + err = errInternal; } + defHeader.rejectPending(err); + defStatus.rejectPending(err); if (!outStream.closed) { - outStream.notifyComplete(); + if (err) { + outStream.notifyError(err); + } else { + outStream.notifyComplete(); + } } + defTrailer.rejectPending(err); }, onHeaders(headers: Metadata, _: number): void { defHeader.resolvePending(headers.headersMap); @@ -203,14 +214,19 @@ export class GrpcWSTransport implements RpcTransport { defMessage.resolve(method.O.fromBinary(chunkBytes, opt.binaryOptions)); }, onEnd(err) { - if (err && err instanceof Error) { - defHeader.rejectPending(err); - defMessage.rejectPending(err); - defStatus.rejectPending(err); - defTrailer.rejectPending(err); - return; + if (err instanceof Error) { + if (err.name === 'AbortError') { + err = errCancelled; + } + } else { + err = errInternal; } + defHeader.rejectPending(err); + defMessage.rejectPending(err); + defStatus.rejectPending(err); + defTrailer.rejectPending(err); + defMessage.resolve(method.O.create()); }, onHeaders(headers: Metadata, _: number): void { @@ -259,17 +275,24 @@ export class GrpcWSTransport implements RpcTransport { outStream.notifyMessage(method.O.fromBinary(chunkBytes, opt.binaryOptions)); }, onEnd(err) { - if (err && err instanceof Error) { - defHeader.rejectPending(err); - outStream.notifyError(err); - defStatus.rejectPending(err); - defTrailer.rejectPending(err); - return; + if (err instanceof Error) { + if (err.name === 'AbortError') { + err = errCancelled; + } + } else { + err = errInternal; } + defHeader.rejectPending(err); + defStatus.rejectPending(err); if (!outStream.closed) { - outStream.notifyComplete(); + if (err) { + outStream.notifyError(err); + } else { + outStream.notifyComplete(); + } } + defTrailer.rejectPending(err); }, onHeaders(headers: Metadata, _: number): void { defHeader.resolvePending(headers.headersMap); diff --git a/app/composables/grpcws/transports/websocket/websocketChannel.ts b/app/composables/grpcws/transports/websocket/websocketChannel.ts index e2b83c4dc..70c00a20f 100644 --- a/app/composables/grpcws/transports/websocket/websocketChannel.ts +++ b/app/composables/grpcws/transports/websocket/websocketChannel.ts @@ -36,6 +36,8 @@ interface GrpcStream extends Transport { readonly service: string; readonly method: string; readonly isStream: boolean; + + closed: boolean; } interface WebsocketChannel { @@ -109,6 +111,7 @@ class WebsocketChannelImpl implements WebsocketChannel { stream[0].debug && this.logger.debug('Received complete for stream', streamId); stream[0].onEnd(); + stream[1].closed = true; // Remove completed stream this.activeStreams.delete(streamId); break; @@ -175,6 +178,8 @@ class WebsocketChannelStream { method: string; isStream: boolean; + closed: boolean = false; + constructor(wsChannel: WebsocketChannelImpl, logger: ILogger, streamId: number, opts: TransportOptions) { this.wsChannel = wsChannel; this.logger = logger; @@ -251,6 +256,10 @@ class WebsocketChannelStream { } async cancel() { + if (this.closed) { + return; + } + this.opts.debug && this.logger.debug('Stream cancel', this.streamId); this.opts.onEnd(errCancelled);