From 815c37a4c912d727896d063a4055d186500368a0 Mon Sep 17 00:00:00 2001 From: arturovt Date: Tue, 11 Jul 2023 20:15:19 +0300 Subject: [PATCH] refactor(websocket-plugin): get rid off `rxjs/webSocket` and use `WebSocket` directly This commit updated the WebSocket handler implementation and switched to using the native `WebSocket` directly, instead of relying on `rxjs/webSocket`. The RxJS WebSocket implementation can be cumbersome when it comes to handling socket events and lacks simplicity. By handling socket events manually, we avoid the need to use the RxJS implementation. --- packages/websocket-plugin/src/symbols.ts | 12 +- .../websocket-plugin/src/websocket-handler.ts | 236 +++++++++--------- .../websocket-plugin/src/websocket.module.ts | 19 +- 3 files changed, 133 insertions(+), 134 deletions(-) diff --git a/packages/websocket-plugin/src/symbols.ts b/packages/websocket-plugin/src/symbols.ts index 5a4849a9c..dea63cc67 100644 --- a/packages/websocket-plugin/src/symbols.ts +++ b/packages/websocket-plugin/src/symbols.ts @@ -1,6 +1,12 @@ import { InjectionToken } from '@angular/core'; -export const NGXS_WEBSOCKET_OPTIONS = new InjectionToken('NGXS_WEBSOCKET_OPTIONS'); +declare const ngDevMode: boolean; + +const NG_DEV_MODE = typeof ngDevMode === 'undefined' || ngDevMode; + +export const NGXS_WEBSOCKET_OPTIONS = new InjectionToken( + NG_DEV_MODE ? 'NGXS_WEBSOCKET_OPTIONS' : '' +); export interface NgxsWebsocketPluginOptions { /** @@ -52,10 +58,6 @@ export interface NgxsWebsocketPluginOptions { deserializer?: (e: MessageEvent) => any; } -export function noop(..._args: any[]) { - return function() {}; -} - /** * Action to connect to the websocket. Optionally pass a URL. */ diff --git a/packages/websocket-plugin/src/websocket-handler.ts b/packages/websocket-plugin/src/websocket-handler.ts index 04ee2bba9..8eef65d96 100644 --- a/packages/websocket-plugin/src/websocket-handler.ts +++ b/packages/websocket-plugin/src/websocket-handler.ts @@ -1,8 +1,7 @@ -import { Injectable, Inject, OnDestroy } from '@angular/core'; +import { Injectable, Inject, OnDestroy, NgZone } from '@angular/core'; import { Actions, Store, getValue, ofActionDispatched } from '@ngxs/store'; -import { Subscription } from 'rxjs'; - -import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket'; +import { Subject, fromEvent } from 'rxjs'; +import { takeUntil } from 'rxjs/operators'; import { ConnectWebSocket, @@ -17,157 +16,156 @@ import { WebSocketConnected } from './symbols'; -@Injectable() +@Injectable({ providedIn: 'root' }) export class WebSocketHandler implements OnDestroy { - private socket: WebSocketSubject | null = null; - - private config: WebSocketSubjectConfig = { - url: this.options.url!, - protocol: this.options.protocol, - // Default binary type is `blob` for the global `WebSocket` - binaryType: this.options.binaryType, - serializer: this.options.serializer, - deserializer: this.options.deserializer, - closeObserver: { - next: () => { - // ATTENTION! - // See https://github.com/ReactiveX/rxjs/blob/master/src/internal/observable/dom/WebSocketSubject.ts#L340 - // RxJS socket emits `onComplete` event only if `event.wasClean` is truthy - // and doesn't complete socket subject if it's falsy - this.disconnect(); - } - }, - openObserver: { - next: () => this.store.dispatch(new WebSocketConnected()) - } - }; + private _socket: WebSocket | null = null; - private typeKey = this.options.typeKey!; + private readonly _socketClosed$ = new Subject(); - private subscription = new Subscription(); + private readonly _typeKey = this._options.typeKey!; + + private readonly _destroy$ = new Subject(); constructor( - private store: Store, - private actions$: Actions, - @Inject(NGXS_WEBSOCKET_OPTIONS) private options: NgxsWebsocketPluginOptions + private _store: Store, + private _ngZone: NgZone, + private _actions$: Actions, + @Inject(NGXS_WEBSOCKET_OPTIONS) private _options: NgxsWebsocketPluginOptions ) { - this.setupActionsListeners(); + this._setupActionsListeners(); } ngOnDestroy(): void { - this.closeConnection(); - this.subscription.unsubscribe(); + this._disconnect(/* forcelyCloseSocket */ true); + this._destroy$.next(); } - private setupActionsListeners(): void { - this.subscription.add( - this.actions$.pipe(ofActionDispatched(ConnectWebSocket)).subscribe(({ payload }) => { + private _setupActionsListeners(): void { + this._actions$ + .pipe(ofActionDispatched(ConnectWebSocket), takeUntil(this._destroy$)) + .subscribe(({ payload }) => { this.connect(payload); - }) - ); + }); - this.subscription.add( - this.actions$.pipe(ofActionDispatched(DisconnectWebSocket)).subscribe(() => { - this.disconnect(); - }) - ); + this._actions$ + .pipe(ofActionDispatched(DisconnectWebSocket), takeUntil(this._destroy$)) + .subscribe(() => { + this._disconnect(/* forcelyCloseSocket */ true); + }); - this.subscription.add( - this.actions$.pipe(ofActionDispatched(SendWebSocketMessage)).subscribe(({ payload }) => { + this._actions$ + .pipe(ofActionDispatched(SendWebSocketMessage), takeUntil(this._destroy$)) + .subscribe(({ payload }) => { this.send(payload); - }) - ); + }); } private connect(options?: NgxsWebsocketPluginOptions): void { - this.updateConnection(); + if (this._socket) { + this._closeConnection(/* forcelyCloseSocket */ true); + this._store.dispatch(new WebSocketConnectionUpdated()); + } - // Users can pass the options in the connect method so - // if options aren't available at DI bootstrap they have access - // to pass them here + // TODO(arturovt): we should not override default config values because this breaks support for having multiple socket connections. if (options) { - this.mergeConfigWithOptions(options); + if (options.serializer) { + this._options.serializer = options.serializer; + } + + if (options.deserializer) { + this._options.deserializer = options.deserializer; + } } - this.socket = new WebSocketSubject(this.config); - - this.socket.subscribe({ - next: (message: any) => { - const type = getValue(message, this.typeKey); - if (!type) { - throw new TypeKeyPropertyMissingError(this.typeKey); - } - this.store.dispatch({ ...message, type }); - }, - error: (error: any) => { - if (error instanceof CloseEvent) { - this.dispatchWebSocketDisconnected(); - } else { - this.store.dispatch(new WebsocketMessageError(error)); - } + this._ngZone.runOutsideAngular(() => { + // We either use options provided in the `ConnectWebSocket` action + // or fallback to default config values. + const url = options?.url || this._options.url!; + const protocol = options?.protocol || this._options.protocol; + const binaryType = options?.binaryType || this._options.binaryType; + + const socket = (this._socket = protocol + ? new WebSocket(url, protocol) + : new WebSocket(url)); + + if (binaryType) { + socket.binaryType = binaryType; } + + fromEvent(socket, 'open') + .pipe(takeUntil(this._socketClosed$)) + .subscribe(() => this._store.dispatch(new WebSocketConnected())); + + fromEvent(socket, 'message') + .pipe(takeUntil(this._socketClosed$)) + .subscribe(event => { + const message = this._options.deserializer!(event); + const type = getValue(message, this._typeKey); + if (!type) { + throw new TypeKeyPropertyMissingError(this._typeKey); + } + this._store.dispatch({ ...message, type }); + }); + + fromEvent(socket, 'error') + .pipe(takeUntil(this._socketClosed$)) + .subscribe(error => { + // The error event indicates that an error has occurred during the + // WebSocket communication, and it is often appropriate to close the + // WebSocket connection when such an error occurs. + // We need to call `_disconnect()` after the error event has been fired. + // This ensures that the WebSocket connection is properly closed to prevent + // potential resource leaks. + this._disconnect(/* forcelyCloseSocket */ true); + this._store.dispatch(new WebsocketMessageError(error)); + }); + + fromEvent(socket, 'close') + .pipe(takeUntil(this._socketClosed$)) + .subscribe(event => { + if (event.wasClean) { + // It is not necessary to call `socket.close()` after the `close` event + // has been fired. In fact, calling `socket.close()` within the `close` + // event handler or immediately after the event has been fired can lead + // to unexpected behavior. + this._disconnect(/* forcelyCloseSocket */ false); + } else { + // If the WebSocket `close` event has been fired and its `wasClean` + // property is falsy, it indicates that the WebSocket connection was + // closed in an unexpected or abnormal manner. + // We should call `socket.close()` in this scenario, we can ensure that + // the WebSocket connection is properly closed. + this._disconnect(/* forcelyCloseSocket */ true); + this._store.dispatch(new WebsocketMessageError(event)); + } + }); }); } - private disconnect(): void { - if (this.socket) { - this.closeConnection(); - this.dispatchWebSocketDisconnected(); + private _disconnect(forcelyCloseSocket: boolean): void { + if (this._socket) { + this._closeConnection(forcelyCloseSocket); + this._store.dispatch(new WebSocketDisconnected()); } } private send(data: any): void { - if (!this.socket) { + if (!this._socket) { throw new Error('You must connect to the socket before sending any data'); } - this.socket.next(data); - } - - /** - * Don't enlarge the `connect` method - */ - private mergeConfigWithOptions(options: NgxsWebsocketPluginOptions): void { - if (options.url) { - this.config.url = options.url; - } - - if (options.serializer) { - this.config.serializer = options.serializer; - } - - if (options.deserializer) { - this.config.deserializer = options.deserializer; + try { + this._socket.send(this._options.serializer!(data)); + } catch (error) { + this._store.dispatch(new WebsocketMessageError(error)); } } - /** - * To ensure we don't have any memory leaks - * e.g. if the user occasionally dispatched `ConnectWebSocket` twice - * then the previous subscription will still live in the memory - * to prevent such behavior - we close the previous connection if it exists - */ - private updateConnection(): void { - if (this.socket) { - this.closeConnection(); - this.store.dispatch(new WebSocketConnectionUpdated()); - } - } - - /** - * Used in many places so it's better to move the code into function - */ - private dispatchWebSocketDisconnected(): void { - this.store.dispatch(new WebSocketDisconnected()); - } - - private closeConnection(): void { - // `socket.complete()` closes the connection - // also it doesn't invoke the `onComplete` callback that we passed - // into `socket.subscribe(...)` - if (this.socket !== null) { - this.socket.complete(); - this.socket = null; + private _closeConnection(forcelyCloseSocket: boolean): void { + if (forcelyCloseSocket) { + this._socket?.close(); } + this._socket = null; + this._socketClosed$.next(); } } diff --git a/packages/websocket-plugin/src/websocket.module.ts b/packages/websocket-plugin/src/websocket.module.ts index 0e251c0c1..5205699b6 100644 --- a/packages/websocket-plugin/src/websocket.module.ts +++ b/packages/websocket-plugin/src/websocket.module.ts @@ -1,7 +1,7 @@ -import { NgModule, ModuleWithProviders, APP_INITIALIZER, InjectionToken } from '@angular/core'; +import { NgModule, ModuleWithProviders, InjectionToken } from '@angular/core'; import { WebSocketHandler } from './websocket-handler'; -import { NgxsWebsocketPluginOptions, NGXS_WEBSOCKET_OPTIONS, noop } from './symbols'; +import { NgxsWebsocketPluginOptions, NGXS_WEBSOCKET_OPTIONS } from './symbols'; export function websocketOptionsFactory(options: NgxsWebsocketPluginOptions) { return { @@ -18,17 +18,22 @@ export function websocketOptionsFactory(options: NgxsWebsocketPluginOptions) { }; } -export const USER_OPTIONS = new InjectionToken('USER_OPTIONS'); +declare const ngDevMode: boolean; + +const NG_DEV_MODE = typeof ngDevMode === 'undefined' || ngDevMode; + +export const USER_OPTIONS = new InjectionToken(NG_DEV_MODE ? 'USER_OPTIONS' : ''); @NgModule() export class NgxsWebsocketPluginModule { + constructor(_handler: WebSocketHandler) {} + static forRoot( options?: NgxsWebsocketPluginOptions ): ModuleWithProviders { return { ngModule: NgxsWebsocketPluginModule, providers: [ - WebSocketHandler, { provide: USER_OPTIONS, useValue: options @@ -37,12 +42,6 @@ export class NgxsWebsocketPluginModule { provide: NGXS_WEBSOCKET_OPTIONS, useFactory: websocketOptionsFactory, deps: [USER_OPTIONS] - }, - { - provide: APP_INITIALIZER, - useFactory: noop, - deps: [WebSocketHandler], - multi: true } ] };