Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(websocket-plugin): get rid off rxjs/webSocket and use WebSocket directly #2033

Merged
merged 2 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ $ npm install @ngxs/store@dev
- Fix: Devtools Plugin - Do not re-enter Angular zone when resetting state [#2038](https://github.com/ngxs/store/pull/2038)
- Performance: Tree-shake selectors validation errors [#2020](https://github.com/ngxs/store/pull/2020)
- Refactor: Replace `get type()` with `type =` in actions [#2035](https://github.com/ngxs/store/pull/2035)
- Refactor: WebSocket Plugin - Get rid off `rxjs/webSocket` and use `WebSocket` directly [#2033](https://github.com/ngxs/store/pull/2033)

# 3.8.1 2023-05-16

Expand Down
36 changes: 36 additions & 0 deletions packages/websocket-plugin/src/providers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { APP_INITIALIZER } from '@angular/core';

import { WebSocketHandler } from './websocket-handler';
import { USER_OPTIONS, NGXS_WEBSOCKET_OPTIONS, NgxsWebsocketPluginOptions } from './symbols';

export function ɵwebsocketOptionsFactory(options: NgxsWebsocketPluginOptions) {
return {
reconnectInterval: 5000,
reconnectAttempts: 10,
typeKey: 'type',
deserializer(e: MessageEvent) {
return JSON.parse(e.data);
},
serializer(value: any) {
return JSON.stringify(value);
},
...options
};
}

export function ɵgetProviders(options?: NgxsWebsocketPluginOptions) {
return [
{ provide: USER_OPTIONS, useValue: options },
{
provide: NGXS_WEBSOCKET_OPTIONS,
useFactory: ɵwebsocketOptionsFactory,
deps: [USER_OPTIONS]
},
{
provide: APP_INITIALIZER,
useFactory: () => () => {},
deps: [WebSocketHandler],
multi: true
}
];
}
14 changes: 9 additions & 5 deletions packages/websocket-plugin/src/symbols.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import { InjectionToken } from '@angular/core';

export const NGXS_WEBSOCKET_OPTIONS = new InjectionToken('NGXS_WEBSOCKET_OPTIONS');
declare const ngDevMode: boolean;

const NG_DEV_MODE = typeof ngDevMode === 'undefined' || ngDevMode;

export const NGXS_WEBSOCKET_OPTIONS = new InjectionToken<NgxsWebsocketPluginOptions>(
NG_DEV_MODE ? 'NGXS_WEBSOCKET_OPTIONS' : ''
);

export const USER_OPTIONS = new InjectionToken(NG_DEV_MODE ? 'USER_OPTIONS' : '');

export interface NgxsWebsocketPluginOptions {
/**
Expand Down Expand Up @@ -52,10 +60,6 @@ export interface NgxsWebsocketPluginOptions {
deserializer?: (e: MessageEvent) => any;
}

export function noop(..._args: any[]) {
return function () {};
}

/**
* Action to connect to the websocket. Optionally pass a URL.
*/
Expand Down
236 changes: 117 additions & 119 deletions packages/websocket-plugin/src/websocket-handler.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { Injectable, Inject, OnDestroy } from '@angular/core';
import { Injectable, Inject, OnDestroy, NgZone } from '@angular/core';
import { Actions, Store, getValue, ofActionDispatched } from '@ngxs/store';
import { Subscription } from 'rxjs';

import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';
import { ReplaySubject, Subject, fromEvent } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

import {
ConnectWebSocket,
Expand All @@ -17,157 +16,156 @@ import {
WebSocketConnected
} from './symbols';

@Injectable()
@Injectable({ providedIn: 'root' })
export class WebSocketHandler implements OnDestroy {
private socket: WebSocketSubject<any> | null = null;

private config: WebSocketSubjectConfig<any> = {
url: this.options.url!,
protocol: this.options.protocol,
// Default binary type is `blob` for the global `WebSocket`
binaryType: this.options.binaryType,
serializer: this.options.serializer,
deserializer: this.options.deserializer,
closeObserver: {
next: () => {
// ATTENTION!
// See https://github.com/ReactiveX/rxjs/blob/master/src/internal/observable/dom/WebSocketSubject.ts#L340
// RxJS socket emits `onComplete` event only if `event.wasClean` is truthy
// and doesn't complete socket subject if it's falsy
this.disconnect();
}
},
openObserver: {
next: () => this.store.dispatch(new WebSocketConnected())
}
};
private _socket: WebSocket | null = null;

private typeKey = this.options.typeKey!;
private readonly _socketClosed$ = new Subject<void>();

private subscription = new Subscription();
private readonly _typeKey = this._options.typeKey!;

private readonly _destroy$ = new ReplaySubject<void>(1);

constructor(
private store: Store,
private actions$: Actions,
@Inject(NGXS_WEBSOCKET_OPTIONS) private options: NgxsWebsocketPluginOptions
private _store: Store,
private _ngZone: NgZone,
private _actions$: Actions,
@Inject(NGXS_WEBSOCKET_OPTIONS) private _options: NgxsWebsocketPluginOptions
) {
this.setupActionsListeners();
this._setupActionsListeners();
}

ngOnDestroy(): void {
this.closeConnection();
this.subscription.unsubscribe();
this._disconnect(/* forcelyCloseSocket */ true);
this._destroy$.next();
}

private setupActionsListeners(): void {
this.subscription.add(
this.actions$.pipe(ofActionDispatched(ConnectWebSocket)).subscribe(({ payload }) => {
private _setupActionsListeners(): void {
this._actions$
.pipe(ofActionDispatched(ConnectWebSocket), takeUntil(this._destroy$))
.subscribe(({ payload }) => {
this.connect(payload);
})
);
});

this.subscription.add(
this.actions$.pipe(ofActionDispatched(DisconnectWebSocket)).subscribe(() => {
this.disconnect();
})
);
this._actions$
.pipe(ofActionDispatched(DisconnectWebSocket), takeUntil(this._destroy$))
.subscribe(() => {
this._disconnect(/* forcelyCloseSocket */ true);
});

this.subscription.add(
this.actions$.pipe(ofActionDispatched(SendWebSocketMessage)).subscribe(({ payload }) => {
this._actions$
.pipe(ofActionDispatched(SendWebSocketMessage), takeUntil(this._destroy$))
.subscribe(({ payload }) => {
this.send(payload);
})
);
});
}

private connect(options?: NgxsWebsocketPluginOptions): void {
this.updateConnection();
if (this._socket) {
this._closeConnection(/* forcelyCloseSocket */ true);
this._store.dispatch(new WebSocketConnectionUpdated());
}

// Users can pass the options in the connect method so
// if options aren't available at DI bootstrap they have access
// to pass them here
// TODO(arturovt): we should not override default config values because this breaks support for having multiple socket connections.
if (options) {
this.mergeConfigWithOptions(options);
if (options.serializer) {
this._options.serializer = options.serializer;
}

if (options.deserializer) {
this._options.deserializer = options.deserializer;
}
}

this.socket = new WebSocketSubject(this.config);

this.socket.subscribe({
next: (message: any) => {
const type = getValue(message, this.typeKey);
if (!type) {
throw new TypeKeyPropertyMissingError(this.typeKey);
}
this.store.dispatch({ ...message, type });
},
error: (error: any) => {
if (error instanceof CloseEvent) {
this.dispatchWebSocketDisconnected();
} else {
this.store.dispatch(new WebsocketMessageError(error));
}
this._ngZone.runOutsideAngular(() => {
// We either use options provided in the `ConnectWebSocket` action
// or fallback to default config values.
const url = options?.url || this._options.url!;
const protocol = options?.protocol || this._options.protocol;
const binaryType = options?.binaryType || this._options.binaryType;

const socket = (this._socket = protocol
? new WebSocket(url, protocol)
: new WebSocket(url));

if (binaryType) {
socket.binaryType = binaryType;
}

fromEvent(socket, 'open')
.pipe(takeUntil(this._socketClosed$))
.subscribe(() => this._store.dispatch(new WebSocketConnected()));

fromEvent<MessageEvent>(socket, 'message')
.pipe(takeUntil(this._socketClosed$))
.subscribe(event => {
const message = this._options.deserializer!(event);
const type = getValue(message, this._typeKey);
if (!type) {
throw new TypeKeyPropertyMissingError(this._typeKey);
}
this._store.dispatch({ ...message, type });
});

fromEvent(socket, 'error')
.pipe(takeUntil(this._socketClosed$))
.subscribe(error => {
// The error event indicates that an error has occurred during the
// WebSocket communication, and it is often appropriate to close the
// WebSocket connection when such an error occurs.
// We need to call `_disconnect()` after the error event has been fired.
// This ensures that the WebSocket connection is properly closed to prevent
// potential resource leaks.
this._disconnect(/* forcelyCloseSocket */ true);
this._store.dispatch(new WebsocketMessageError(error));
});

fromEvent<CloseEvent>(socket, 'close')
.pipe(takeUntil(this._socketClosed$))
.subscribe(event => {
if (event.wasClean) {
// It is not necessary to call `socket.close()` after the `close` event
// has been fired. In fact, calling `socket.close()` within the `close`
// event handler or immediately after the event has been fired can lead
// to unexpected behavior.
this._disconnect(/* forcelyCloseSocket */ false);
} else {
// If the WebSocket `close` event has been fired and its `wasClean`
// property is falsy, it indicates that the WebSocket connection was
// closed in an unexpected or abnormal manner.
// We should call `socket.close()` in this scenario, we can ensure that
// the WebSocket connection is properly closed.
this._disconnect(/* forcelyCloseSocket */ true);
this._store.dispatch(new WebsocketMessageError(event));
}
});
});
}

private disconnect(): void {
if (this.socket) {
this.closeConnection();
this.dispatchWebSocketDisconnected();
private _disconnect(forcelyCloseSocket: boolean): void {
if (this._socket) {
this._closeConnection(forcelyCloseSocket);
this._store.dispatch(new WebSocketDisconnected());
}
}

private send(data: any): void {
if (!this.socket) {
if (!this._socket) {
throw new Error('You must connect to the socket before sending any data');
}

this.socket.next(data);
}

/**
* Don't enlarge the `connect` method
*/
private mergeConfigWithOptions(options: NgxsWebsocketPluginOptions): void {
if (options.url) {
this.config.url = options.url;
}

if (options.serializer) {
this.config.serializer = options.serializer;
}

if (options.deserializer) {
this.config.deserializer = options.deserializer;
try {
this._socket.send(this._options.serializer!(data));
} catch (error) {
this._store.dispatch(new WebsocketMessageError(error));
}
}

/**
* To ensure we don't have any memory leaks
* e.g. if the user occasionally dispatched `ConnectWebSocket` twice
* then the previous subscription will still live in the memory
* to prevent such behavior - we close the previous connection if it exists
*/
private updateConnection(): void {
if (this.socket) {
this.closeConnection();
this.store.dispatch(new WebSocketConnectionUpdated());
}
}

/**
* Used in many places so it's better to move the code into function
*/
private dispatchWebSocketDisconnected(): void {
this.store.dispatch(new WebSocketDisconnected());
}

private closeConnection(): void {
// `socket.complete()` closes the connection
// also it doesn't invoke the `onComplete` callback that we passed
// into `socket.subscribe(...)`
if (this.socket !== null) {
this.socket.complete();
this.socket = null;
private _closeConnection(forcelyCloseSocket: boolean): void {
if (forcelyCloseSocket) {
this._socket?.close();
}
this._socket = null;
this._socketClosed$.next();
}
}
Loading
Loading