Skip to content

Commit

Permalink
fix: improve grpcws cancel logic in frontend
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Trost <[email protected]>
  • Loading branch information
galexrt committed Sep 22, 2024
1 parent 5f8bac1 commit 0d3c2c7
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 52 deletions.
127 changes: 75 additions & 52 deletions app/composables/grpcws/bridge/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
import type { UseWebSocketReturn } from '@vueuse/core';
import { Metadata } from '~/composables/grpcws/metadata';
import type { GrpcWSOptions } from '../../grpcws/bridge/options';
import { errCancelled, errInternal } from '../errors';
import type { Transport, TransportFactory } from '../transports/transport';
import { WebsocketChannelTransport } from '../transports/websocket/websocketChannel';
import { createGrpcStatus, createGrpcTrailers } from './utils';
Expand Down Expand Up @@ -64,35 +65,7 @@ export class GrpcWSTransport implements RpcTransport {
}

unary<I extends object, O extends object>(method: MethodInfo<I, O>, input: I, options: RpcOptions): UnaryCall<I, O> {
const opt = options as GrpcWSOptions,
transport = this.wsTs({
methodDefinition: method,
debug: opt.debug,
url: '',

onChunk(chunkBytes) {
defHeader.resolvePending({});
defTrailer.resolvePending({});
defStatus.resolvePending(createGrpcStatus(new Metadata()));
defMessage.resolve(method.O.fromBinary(chunkBytes, opt.binaryOptions));
},
onEnd(err) {
if (err && err instanceof Error) {
defHeader.rejectPending(err);
defMessage.rejectPending(err);
defStatus.rejectPending(err);
defTrailer.rejectPending(err);
defMessage.rejectPending(err);
return;
}
},
onHeaders(headers: Metadata, _: number): void {
defHeader.resolvePending(headers.headersMap);

defStatus.resolvePending(createGrpcStatus(headers));
defTrailer.resolvePending(createGrpcTrailers(headers));
},
});
const opt = options as GrpcWSOptions;

const meta = opt.meta ?? {},
defHeader = new Deferred<RpcMetadata>(),
Expand All @@ -111,11 +84,42 @@ export class GrpcWSTransport implements RpcTransport {

const abort = opt.abort || (opt.timeout ? AbortSignal.timeout(opt.timeout) : undefined);
if (abort) {
abort.addEventListener('abort', (_) => {
transport.cancel();
});
abort.addEventListener('abort', () => transport.cancel());
}

const transport = this.wsTs({
methodDefinition: method,
debug: opt.debug,
url: '',

onChunk(chunkBytes) {
defHeader.resolvePending({});
defTrailer.resolvePending({});
defStatus.resolvePending(createGrpcStatus(new Metadata()));
defMessage.resolve(method.O.fromBinary(chunkBytes, opt.binaryOptions));
},
onEnd(err) {
if (err instanceof Error) {
if (err.name === 'AbortError') {
err = errCancelled;
}
} else {
err = errInternal;
}

defHeader.rejectPending(err);
defMessage.rejectPending(err);
defStatus.rejectPending(err);
defTrailer.rejectPending(err);
},
onHeaders(headers: Metadata, _: number): void {
defHeader.resolvePending(headers.headersMap);

defStatus.resolvePending(createGrpcStatus(headers));
defTrailer.resolvePending(createGrpcTrailers(headers));
},
});

transport.start(new Metadata());
transport.sendMessage(method.I.toBinary(input, opt.binaryOptions), true);

Expand All @@ -142,17 +146,24 @@ export class GrpcWSTransport implements RpcTransport {
outStream.notifyMessage(method.O.fromBinary(chunkBytes, opt.binaryOptions));
},
onEnd(err) {
if (err && err instanceof Error) {
defHeader.rejectPending(err);
outStream.notifyError(err);
defStatus.rejectPending(err);
defTrailer.rejectPending(err);
return;
if (err instanceof Error) {
if (err.name === 'AbortError') {
err = errCancelled;
}
} else {
err = errInternal;
}

defHeader.rejectPending(err);
defStatus.rejectPending(err);
if (!outStream.closed) {
outStream.notifyComplete();
if (err) {
outStream.notifyError(err);
} else {
outStream.notifyComplete();
}
}
defTrailer.rejectPending(err);
},
onHeaders(headers: Metadata, _: number): void {
defHeader.resolvePending(headers.headersMap);
Expand Down Expand Up @@ -203,14 +214,19 @@ export class GrpcWSTransport implements RpcTransport {
defMessage.resolve(method.O.fromBinary(chunkBytes, opt.binaryOptions));
},
onEnd(err) {
if (err && err instanceof Error) {
defHeader.rejectPending(err);
defMessage.rejectPending(err);
defStatus.rejectPending(err);
defTrailer.rejectPending(err);
return;
if (err instanceof Error) {
if (err.name === 'AbortError') {
err = errCancelled;
}
} else {
err = errInternal;
}

defHeader.rejectPending(err);
defMessage.rejectPending(err);
defStatus.rejectPending(err);
defTrailer.rejectPending(err);

defMessage.resolve(method.O.create());
},
onHeaders(headers: Metadata, _: number): void {
Expand Down Expand Up @@ -259,17 +275,24 @@ export class GrpcWSTransport implements RpcTransport {
outStream.notifyMessage(method.O.fromBinary(chunkBytes, opt.binaryOptions));
},
onEnd(err) {
if (err && err instanceof Error) {
defHeader.rejectPending(err);
outStream.notifyError(err);
defStatus.rejectPending(err);
defTrailer.rejectPending(err);
return;
if (err instanceof Error) {
if (err.name === 'AbortError') {
err = errCancelled;
}
} else {
err = errInternal;
}

defHeader.rejectPending(err);
defStatus.rejectPending(err);
if (!outStream.closed) {
outStream.notifyComplete();
if (err) {
outStream.notifyError(err);
} else {
outStream.notifyComplete();
}
}
defTrailer.rejectPending(err);
},
onHeaders(headers: Metadata, _: number): void {
defHeader.resolvePending(headers.headersMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ interface GrpcStream extends Transport {
readonly service: string;
readonly method: string;
readonly isStream: boolean;

closed: boolean;
}

interface WebsocketChannel {
Expand Down Expand Up @@ -109,6 +111,7 @@ class WebsocketChannelImpl implements WebsocketChannel {
stream[0].debug && this.logger.debug('Received complete for stream', streamId);

stream[0].onEnd();
stream[1].closed = true;
// Remove completed stream
this.activeStreams.delete(streamId);
break;
Expand Down Expand Up @@ -175,6 +178,8 @@ class WebsocketChannelStream {
method: string;
isStream: boolean;

closed: boolean = false;

constructor(wsChannel: WebsocketChannelImpl, logger: ILogger, streamId: number, opts: TransportOptions) {
this.wsChannel = wsChannel;
this.logger = logger;
Expand Down Expand Up @@ -251,6 +256,10 @@ class WebsocketChannelStream {
}

async cancel() {
if (this.closed) {
return;
}

this.opts.debug && this.logger.debug('Stream cancel', this.streamId);

this.opts.onEnd(errCancelled);
Expand Down

0 comments on commit 0d3c2c7

Please sign in to comment.