Skip to content

Commit

Permalink
refactor(websocket-plugin): get rid off rxjs/webSocket and use `Web…
Browse files Browse the repository at this point in the history
…Socket` directly

This commit updated the WebSocket handler implementation and switched to
using the native `WebSocket` directly, instead of relying on `rxjs/webSocket`.
The RxJS WebSocket implementation can be cumbersome when it comes to handling
socket events and lacks simplicity. By handling socket events manually, we avoid
the need to use the RxJS implementation.
  • Loading branch information
arturovt committed Jul 23, 2023
1 parent b5c0348 commit 815c37a
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 134 deletions.
12 changes: 7 additions & 5 deletions packages/websocket-plugin/src/symbols.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import { InjectionToken } from '@angular/core';

export const NGXS_WEBSOCKET_OPTIONS = new InjectionToken('NGXS_WEBSOCKET_OPTIONS');
declare const ngDevMode: boolean;

const NG_DEV_MODE = typeof ngDevMode === 'undefined' || ngDevMode;

export const NGXS_WEBSOCKET_OPTIONS = new InjectionToken<NgxsWebsocketPluginOptions>(
NG_DEV_MODE ? 'NGXS_WEBSOCKET_OPTIONS' : ''
);

export interface NgxsWebsocketPluginOptions {
/**
Expand Down Expand Up @@ -52,10 +58,6 @@ export interface NgxsWebsocketPluginOptions {
deserializer?: (e: MessageEvent) => any;
}

export function noop(..._args: any[]) {
return function() {};
}

/**
* Action to connect to the websocket. Optionally pass a URL.
*/
Expand Down
236 changes: 117 additions & 119 deletions packages/websocket-plugin/src/websocket-handler.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { Injectable, Inject, OnDestroy } from '@angular/core';
import { Injectable, Inject, OnDestroy, NgZone } from '@angular/core';
import { Actions, Store, getValue, ofActionDispatched } from '@ngxs/store';
import { Subscription } from 'rxjs';

import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';
import { Subject, fromEvent } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

import {
ConnectWebSocket,
Expand All @@ -17,157 +16,156 @@ import {
WebSocketConnected
} from './symbols';

@Injectable()
@Injectable({ providedIn: 'root' })
export class WebSocketHandler implements OnDestroy {
private socket: WebSocketSubject<any> | null = null;

private config: WebSocketSubjectConfig<any> = {
url: this.options.url!,
protocol: this.options.protocol,
// Default binary type is `blob` for the global `WebSocket`
binaryType: this.options.binaryType,
serializer: this.options.serializer,
deserializer: this.options.deserializer,
closeObserver: {
next: () => {
// ATTENTION!
// See https://github.com/ReactiveX/rxjs/blob/master/src/internal/observable/dom/WebSocketSubject.ts#L340
// RxJS socket emits `onComplete` event only if `event.wasClean` is truthy
// and doesn't complete socket subject if it's falsy
this.disconnect();
}
},
openObserver: {
next: () => this.store.dispatch(new WebSocketConnected())
}
};
private _socket: WebSocket | null = null;

private typeKey = this.options.typeKey!;
private readonly _socketClosed$ = new Subject<void>();

private subscription = new Subscription();
private readonly _typeKey = this._options.typeKey!;

private readonly _destroy$ = new Subject<void>();

constructor(
private store: Store,
private actions$: Actions,
@Inject(NGXS_WEBSOCKET_OPTIONS) private options: NgxsWebsocketPluginOptions
private _store: Store,
private _ngZone: NgZone,
private _actions$: Actions,
@Inject(NGXS_WEBSOCKET_OPTIONS) private _options: NgxsWebsocketPluginOptions
) {
this.setupActionsListeners();
this._setupActionsListeners();
}

ngOnDestroy(): void {
this.closeConnection();
this.subscription.unsubscribe();
this._disconnect(/* forcelyCloseSocket */ true);
this._destroy$.next();
}

private setupActionsListeners(): void {
this.subscription.add(
this.actions$.pipe(ofActionDispatched(ConnectWebSocket)).subscribe(({ payload }) => {
private _setupActionsListeners(): void {
this._actions$
.pipe(ofActionDispatched(ConnectWebSocket), takeUntil(this._destroy$))
.subscribe(({ payload }) => {
this.connect(payload);
})
);
});

this.subscription.add(
this.actions$.pipe(ofActionDispatched(DisconnectWebSocket)).subscribe(() => {
this.disconnect();
})
);
this._actions$
.pipe(ofActionDispatched(DisconnectWebSocket), takeUntil(this._destroy$))
.subscribe(() => {
this._disconnect(/* forcelyCloseSocket */ true);
});

this.subscription.add(
this.actions$.pipe(ofActionDispatched(SendWebSocketMessage)).subscribe(({ payload }) => {
this._actions$
.pipe(ofActionDispatched(SendWebSocketMessage), takeUntil(this._destroy$))
.subscribe(({ payload }) => {
this.send(payload);
})
);
});
}

private connect(options?: NgxsWebsocketPluginOptions): void {
this.updateConnection();
if (this._socket) {
this._closeConnection(/* forcelyCloseSocket */ true);
this._store.dispatch(new WebSocketConnectionUpdated());
}

// Users can pass the options in the connect method so
// if options aren't available at DI bootstrap they have access
// to pass them here
// TODO(arturovt): we should not override default config values because this breaks support for having multiple socket connections.
if (options) {
this.mergeConfigWithOptions(options);
if (options.serializer) {
this._options.serializer = options.serializer;
}

if (options.deserializer) {
this._options.deserializer = options.deserializer;
}
}

this.socket = new WebSocketSubject(this.config);

this.socket.subscribe({
next: (message: any) => {
const type = getValue(message, this.typeKey);
if (!type) {
throw new TypeKeyPropertyMissingError(this.typeKey);
}
this.store.dispatch({ ...message, type });
},
error: (error: any) => {
if (error instanceof CloseEvent) {
this.dispatchWebSocketDisconnected();
} else {
this.store.dispatch(new WebsocketMessageError(error));
}
this._ngZone.runOutsideAngular(() => {
// We either use options provided in the `ConnectWebSocket` action
// or fallback to default config values.
const url = options?.url || this._options.url!;
const protocol = options?.protocol || this._options.protocol;
const binaryType = options?.binaryType || this._options.binaryType;

const socket = (this._socket = protocol
? new WebSocket(url, protocol)
: new WebSocket(url));

if (binaryType) {
socket.binaryType = binaryType;
}

fromEvent(socket, 'open')
.pipe(takeUntil(this._socketClosed$))
.subscribe(() => this._store.dispatch(new WebSocketConnected()));

fromEvent<MessageEvent>(socket, 'message')
.pipe(takeUntil(this._socketClosed$))
.subscribe(event => {
const message = this._options.deserializer!(event);
const type = getValue(message, this._typeKey);
if (!type) {
throw new TypeKeyPropertyMissingError(this._typeKey);
}
this._store.dispatch({ ...message, type });
});

fromEvent(socket, 'error')
.pipe(takeUntil(this._socketClosed$))
.subscribe(error => {
// The error event indicates that an error has occurred during the
// WebSocket communication, and it is often appropriate to close the
// WebSocket connection when such an error occurs.
// We need to call `_disconnect()` after the error event has been fired.
// This ensures that the WebSocket connection is properly closed to prevent
// potential resource leaks.
this._disconnect(/* forcelyCloseSocket */ true);
this._store.dispatch(new WebsocketMessageError(error));
});

fromEvent<CloseEvent>(socket, 'close')
.pipe(takeUntil(this._socketClosed$))
.subscribe(event => {
if (event.wasClean) {
// It is not necessary to call `socket.close()` after the `close` event
// has been fired. In fact, calling `socket.close()` within the `close`
// event handler or immediately after the event has been fired can lead
// to unexpected behavior.
this._disconnect(/* forcelyCloseSocket */ false);
} else {
// If the WebSocket `close` event has been fired and its `wasClean`
// property is falsy, it indicates that the WebSocket connection was
// closed in an unexpected or abnormal manner.
// We should call `socket.close()` in this scenario, we can ensure that
// the WebSocket connection is properly closed.
this._disconnect(/* forcelyCloseSocket */ true);
this._store.dispatch(new WebsocketMessageError(event));
}
});
});
}

private disconnect(): void {
if (this.socket) {
this.closeConnection();
this.dispatchWebSocketDisconnected();
private _disconnect(forcelyCloseSocket: boolean): void {
if (this._socket) {
this._closeConnection(forcelyCloseSocket);
this._store.dispatch(new WebSocketDisconnected());
}
}

private send(data: any): void {
if (!this.socket) {
if (!this._socket) {
throw new Error('You must connect to the socket before sending any data');
}

this.socket.next(data);
}

/**
* Don't enlarge the `connect` method
*/
private mergeConfigWithOptions(options: NgxsWebsocketPluginOptions): void {
if (options.url) {
this.config.url = options.url;
}

if (options.serializer) {
this.config.serializer = options.serializer;
}

if (options.deserializer) {
this.config.deserializer = options.deserializer;
try {
this._socket.send(this._options.serializer!(data));
} catch (error) {
this._store.dispatch(new WebsocketMessageError(error));
}
}

/**
* To ensure we don't have any memory leaks
* e.g. if the user occasionally dispatched `ConnectWebSocket` twice
* then the previous subscription will still live in the memory
* to prevent such behavior - we close the previous connection if it exists
*/
private updateConnection(): void {
if (this.socket) {
this.closeConnection();
this.store.dispatch(new WebSocketConnectionUpdated());
}
}

/**
* Used in many places so it's better to move the code into function
*/
private dispatchWebSocketDisconnected(): void {
this.store.dispatch(new WebSocketDisconnected());
}

private closeConnection(): void {
// `socket.complete()` closes the connection
// also it doesn't invoke the `onComplete` callback that we passed
// into `socket.subscribe(...)`
if (this.socket !== null) {
this.socket.complete();
this.socket = null;
private _closeConnection(forcelyCloseSocket: boolean): void {
if (forcelyCloseSocket) {
this._socket?.close();
}
this._socket = null;
this._socketClosed$.next();
}
}
19 changes: 9 additions & 10 deletions packages/websocket-plugin/src/websocket.module.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { NgModule, ModuleWithProviders, APP_INITIALIZER, InjectionToken } from '@angular/core';
import { NgModule, ModuleWithProviders, InjectionToken } from '@angular/core';

import { WebSocketHandler } from './websocket-handler';
import { NgxsWebsocketPluginOptions, NGXS_WEBSOCKET_OPTIONS, noop } from './symbols';
import { NgxsWebsocketPluginOptions, NGXS_WEBSOCKET_OPTIONS } from './symbols';

export function websocketOptionsFactory(options: NgxsWebsocketPluginOptions) {
return {
Expand All @@ -18,17 +18,22 @@ export function websocketOptionsFactory(options: NgxsWebsocketPluginOptions) {
};
}

export const USER_OPTIONS = new InjectionToken('USER_OPTIONS');
declare const ngDevMode: boolean;

const NG_DEV_MODE = typeof ngDevMode === 'undefined' || ngDevMode;

export const USER_OPTIONS = new InjectionToken(NG_DEV_MODE ? 'USER_OPTIONS' : '');

@NgModule()
export class NgxsWebsocketPluginModule {
constructor(_handler: WebSocketHandler) {}

static forRoot(
options?: NgxsWebsocketPluginOptions
): ModuleWithProviders<NgxsWebsocketPluginModule> {
return {
ngModule: NgxsWebsocketPluginModule,
providers: [
WebSocketHandler,
{
provide: USER_OPTIONS,
useValue: options
Expand All @@ -37,12 +42,6 @@ export class NgxsWebsocketPluginModule {
provide: NGXS_WEBSOCKET_OPTIONS,
useFactory: websocketOptionsFactory,
deps: [USER_OPTIONS]
},
{
provide: APP_INITIALIZER,
useFactory: noop,
deps: [WebSocketHandler],
multi: true
}
]
};
Expand Down

0 comments on commit 815c37a

Please sign in to comment.