diff --git a/packages/websocket-plugin/src/symbols.ts b/packages/websocket-plugin/src/symbols.ts index 5a4849a9c..dea63cc67 100644 --- a/packages/websocket-plugin/src/symbols.ts +++ b/packages/websocket-plugin/src/symbols.ts @@ -1,6 +1,12 @@ import { InjectionToken } from '@angular/core'; -export const NGXS_WEBSOCKET_OPTIONS = new InjectionToken('NGXS_WEBSOCKET_OPTIONS'); +declare const ngDevMode: boolean; + +const NG_DEV_MODE = typeof ngDevMode === 'undefined' || ngDevMode; + +export const NGXS_WEBSOCKET_OPTIONS = new InjectionToken( + NG_DEV_MODE ? 'NGXS_WEBSOCKET_OPTIONS' : '' +); export interface NgxsWebsocketPluginOptions { /** @@ -52,10 +58,6 @@ export interface NgxsWebsocketPluginOptions { deserializer?: (e: MessageEvent) => any; } -export function noop(..._args: any[]) { - return function() {}; -} - /** * Action to connect to the websocket. Optionally pass a URL. */ diff --git a/packages/websocket-plugin/src/websocket-handler.ts b/packages/websocket-plugin/src/websocket-handler.ts index 04ee2bba9..8eef65d96 100644 --- a/packages/websocket-plugin/src/websocket-handler.ts +++ b/packages/websocket-plugin/src/websocket-handler.ts @@ -1,8 +1,7 @@ -import { Injectable, Inject, OnDestroy } from '@angular/core'; +import { Injectable, Inject, OnDestroy, NgZone } from '@angular/core'; import { Actions, Store, getValue, ofActionDispatched } from '@ngxs/store'; -import { Subscription } from 'rxjs'; - -import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket'; +import { Subject, fromEvent } from 'rxjs'; +import { takeUntil } from 'rxjs/operators'; import { ConnectWebSocket, @@ -17,157 +16,156 @@ import { WebSocketConnected } from './symbols'; -@Injectable() +@Injectable({ providedIn: 'root' }) export class WebSocketHandler implements OnDestroy { - private socket: WebSocketSubject | null = null; - - private config: WebSocketSubjectConfig = { - url: this.options.url!, - protocol: this.options.protocol, - // Default binary type is `blob` for the global `WebSocket` - binaryType: this.options.binaryType, - serializer: this.options.serializer, - deserializer: this.options.deserializer, - closeObserver: { - next: () => { - // ATTENTION! - // See https://github.com/ReactiveX/rxjs/blob/master/src/internal/observable/dom/WebSocketSubject.ts#L340 - // RxJS socket emits `onComplete` event only if `event.wasClean` is truthy - // and doesn't complete socket subject if it's falsy - this.disconnect(); - } - }, - openObserver: { - next: () => this.store.dispatch(new WebSocketConnected()) - } - }; + private _socket: WebSocket | null = null; - private typeKey = this.options.typeKey!; + private readonly _socketClosed$ = new Subject(); - private subscription = new Subscription(); + private readonly _typeKey = this._options.typeKey!; + + private readonly _destroy$ = new Subject(); constructor( - private store: Store, - private actions$: Actions, - @Inject(NGXS_WEBSOCKET_OPTIONS) private options: NgxsWebsocketPluginOptions + private _store: Store, + private _ngZone: NgZone, + private _actions$: Actions, + @Inject(NGXS_WEBSOCKET_OPTIONS) private _options: NgxsWebsocketPluginOptions ) { - this.setupActionsListeners(); + this._setupActionsListeners(); } ngOnDestroy(): void { - this.closeConnection(); - this.subscription.unsubscribe(); + this._disconnect(/* forcelyCloseSocket */ true); + this._destroy$.next(); } - private setupActionsListeners(): void { - this.subscription.add( - this.actions$.pipe(ofActionDispatched(ConnectWebSocket)).subscribe(({ payload }) => { + private _setupActionsListeners(): void { + this._actions$ + .pipe(ofActionDispatched(ConnectWebSocket), takeUntil(this._destroy$)) + .subscribe(({ payload }) => { this.connect(payload); - }) - ); + }); - this.subscription.add( - this.actions$.pipe(ofActionDispatched(DisconnectWebSocket)).subscribe(() => { - this.disconnect(); - }) - ); + this._actions$ + .pipe(ofActionDispatched(DisconnectWebSocket), takeUntil(this._destroy$)) + .subscribe(() => { + this._disconnect(/* forcelyCloseSocket */ true); + }); - this.subscription.add( - this.actions$.pipe(ofActionDispatched(SendWebSocketMessage)).subscribe(({ payload }) => { + this._actions$ + .pipe(ofActionDispatched(SendWebSocketMessage), takeUntil(this._destroy$)) + .subscribe(({ payload }) => { this.send(payload); - }) - ); + }); } private connect(options?: NgxsWebsocketPluginOptions): void { - this.updateConnection(); + if (this._socket) { + this._closeConnection(/* forcelyCloseSocket */ true); + this._store.dispatch(new WebSocketConnectionUpdated()); + } - // Users can pass the options in the connect method so - // if options aren't available at DI bootstrap they have access - // to pass them here + // TODO(arturovt): we should not override default config values because this breaks support for having multiple socket connections. if (options) { - this.mergeConfigWithOptions(options); + if (options.serializer) { + this._options.serializer = options.serializer; + } + + if (options.deserializer) { + this._options.deserializer = options.deserializer; + } } - this.socket = new WebSocketSubject(this.config); - - this.socket.subscribe({ - next: (message: any) => { - const type = getValue(message, this.typeKey); - if (!type) { - throw new TypeKeyPropertyMissingError(this.typeKey); - } - this.store.dispatch({ ...message, type }); - }, - error: (error: any) => { - if (error instanceof CloseEvent) { - this.dispatchWebSocketDisconnected(); - } else { - this.store.dispatch(new WebsocketMessageError(error)); - } + this._ngZone.runOutsideAngular(() => { + // We either use options provided in the `ConnectWebSocket` action + // or fallback to default config values. + const url = options?.url || this._options.url!; + const protocol = options?.protocol || this._options.protocol; + const binaryType = options?.binaryType || this._options.binaryType; + + const socket = (this._socket = protocol + ? new WebSocket(url, protocol) + : new WebSocket(url)); + + if (binaryType) { + socket.binaryType = binaryType; } + + fromEvent(socket, 'open') + .pipe(takeUntil(this._socketClosed$)) + .subscribe(() => this._store.dispatch(new WebSocketConnected())); + + fromEvent(socket, 'message') + .pipe(takeUntil(this._socketClosed$)) + .subscribe(event => { + const message = this._options.deserializer!(event); + const type = getValue(message, this._typeKey); + if (!type) { + throw new TypeKeyPropertyMissingError(this._typeKey); + } + this._store.dispatch({ ...message, type }); + }); + + fromEvent(socket, 'error') + .pipe(takeUntil(this._socketClosed$)) + .subscribe(error => { + // The error event indicates that an error has occurred during the + // WebSocket communication, and it is often appropriate to close the + // WebSocket connection when such an error occurs. + // We need to call `_disconnect()` after the error event has been fired. + // This ensures that the WebSocket connection is properly closed to prevent + // potential resource leaks. + this._disconnect(/* forcelyCloseSocket */ true); + this._store.dispatch(new WebsocketMessageError(error)); + }); + + fromEvent(socket, 'close') + .pipe(takeUntil(this._socketClosed$)) + .subscribe(event => { + if (event.wasClean) { + // It is not necessary to call `socket.close()` after the `close` event + // has been fired. In fact, calling `socket.close()` within the `close` + // event handler or immediately after the event has been fired can lead + // to unexpected behavior. + this._disconnect(/* forcelyCloseSocket */ false); + } else { + // If the WebSocket `close` event has been fired and its `wasClean` + // property is falsy, it indicates that the WebSocket connection was + // closed in an unexpected or abnormal manner. + // We should call `socket.close()` in this scenario, we can ensure that + // the WebSocket connection is properly closed. + this._disconnect(/* forcelyCloseSocket */ true); + this._store.dispatch(new WebsocketMessageError(event)); + } + }); }); } - private disconnect(): void { - if (this.socket) { - this.closeConnection(); - this.dispatchWebSocketDisconnected(); + private _disconnect(forcelyCloseSocket: boolean): void { + if (this._socket) { + this._closeConnection(forcelyCloseSocket); + this._store.dispatch(new WebSocketDisconnected()); } } private send(data: any): void { - if (!this.socket) { + if (!this._socket) { throw new Error('You must connect to the socket before sending any data'); } - this.socket.next(data); - } - - /** - * Don't enlarge the `connect` method - */ - private mergeConfigWithOptions(options: NgxsWebsocketPluginOptions): void { - if (options.url) { - this.config.url = options.url; - } - - if (options.serializer) { - this.config.serializer = options.serializer; - } - - if (options.deserializer) { - this.config.deserializer = options.deserializer; + try { + this._socket.send(this._options.serializer!(data)); + } catch (error) { + this._store.dispatch(new WebsocketMessageError(error)); } } - /** - * To ensure we don't have any memory leaks - * e.g. if the user occasionally dispatched `ConnectWebSocket` twice - * then the previous subscription will still live in the memory - * to prevent such behavior - we close the previous connection if it exists - */ - private updateConnection(): void { - if (this.socket) { - this.closeConnection(); - this.store.dispatch(new WebSocketConnectionUpdated()); - } - } - - /** - * Used in many places so it's better to move the code into function - */ - private dispatchWebSocketDisconnected(): void { - this.store.dispatch(new WebSocketDisconnected()); - } - - private closeConnection(): void { - // `socket.complete()` closes the connection - // also it doesn't invoke the `onComplete` callback that we passed - // into `socket.subscribe(...)` - if (this.socket !== null) { - this.socket.complete(); - this.socket = null; + private _closeConnection(forcelyCloseSocket: boolean): void { + if (forcelyCloseSocket) { + this._socket?.close(); } + this._socket = null; + this._socketClosed$.next(); } } diff --git a/packages/websocket-plugin/src/websocket.module.ts b/packages/websocket-plugin/src/websocket.module.ts index 0e251c0c1..5205699b6 100644 --- a/packages/websocket-plugin/src/websocket.module.ts +++ b/packages/websocket-plugin/src/websocket.module.ts @@ -1,7 +1,7 @@ -import { NgModule, ModuleWithProviders, APP_INITIALIZER, InjectionToken } from '@angular/core'; +import { NgModule, ModuleWithProviders, InjectionToken } from '@angular/core'; import { WebSocketHandler } from './websocket-handler'; -import { NgxsWebsocketPluginOptions, NGXS_WEBSOCKET_OPTIONS, noop } from './symbols'; +import { NgxsWebsocketPluginOptions, NGXS_WEBSOCKET_OPTIONS } from './symbols'; export function websocketOptionsFactory(options: NgxsWebsocketPluginOptions) { return { @@ -18,17 +18,22 @@ export function websocketOptionsFactory(options: NgxsWebsocketPluginOptions) { }; } -export const USER_OPTIONS = new InjectionToken('USER_OPTIONS'); +declare const ngDevMode: boolean; + +const NG_DEV_MODE = typeof ngDevMode === 'undefined' || ngDevMode; + +export const USER_OPTIONS = new InjectionToken(NG_DEV_MODE ? 'USER_OPTIONS' : ''); @NgModule() export class NgxsWebsocketPluginModule { + constructor(_handler: WebSocketHandler) {} + static forRoot( options?: NgxsWebsocketPluginOptions ): ModuleWithProviders { return { ngModule: NgxsWebsocketPluginModule, providers: [ - WebSocketHandler, { provide: USER_OPTIONS, useValue: options @@ -37,12 +42,6 @@ export class NgxsWebsocketPluginModule { provide: NGXS_WEBSOCKET_OPTIONS, useFactory: websocketOptionsFactory, deps: [USER_OPTIONS] - }, - { - provide: APP_INITIALIZER, - useFactory: noop, - deps: [WebSocketHandler], - multi: true } ] };