Skip to content

Commit

Permalink
refactor(websocket-plugin): get rid off rxjs/webSocket and use `Web…
Browse files Browse the repository at this point in the history
…Socket` directly
  • Loading branch information
arturovt committed Jul 11, 2023
1 parent c9c7f9e commit b823b11
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 125 deletions.
10 changes: 5 additions & 5 deletions packages/websocket-plugin/src/symbols.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { InjectionToken } from '@angular/core';

export const NGXS_WEBSOCKET_OPTIONS = new InjectionToken('NGXS_WEBSOCKET_OPTIONS');
const NG_DEV_MODE = typeof ngDevMode === 'undefined' || ngDevMode;

export const NGXS_WEBSOCKET_OPTIONS = new InjectionToken<NgxsWebsocketPluginOptions>(
NG_DEV_MODE ? 'NGXS_WEBSOCKET_OPTIONS' : ''
);

export interface NgxsWebsocketPluginOptions {
/**
Expand Down Expand Up @@ -52,10 +56,6 @@ export interface NgxsWebsocketPluginOptions {
deserializer?: (e: MessageEvent) => any;
}

export function noop(..._args: any[]) {
return function() {};
}

/**
* Action to connect to the websocket. Optionally pass a URL.
*/
Expand Down
206 changes: 96 additions & 110 deletions packages/websocket-plugin/src/websocket-handler.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { Injectable, Inject, OnDestroy } from '@angular/core';
import { Injectable, Inject, OnDestroy, NgZone } from '@angular/core';
import { Actions, Store, getValue, ofActionDispatched } from '@ngxs/store';
import { Subscription } from 'rxjs';

import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';
import { Subject, fromEvent } from 'rxjs';

import {
ConnectWebSocket,
Expand All @@ -16,129 +14,124 @@ import {
WebSocketConnectionUpdated,
WebSocketConnected
} from './symbols';
import { takeUntil } from 'rxjs/operators';

@Injectable()
@Injectable({ providedIn: 'root' })
export class WebSocketHandler implements OnDestroy {
private socket: WebSocketSubject<any> | null = null;

private config: WebSocketSubjectConfig<any> = {
url: this.options.url!,
protocol: this.options.protocol,
// Default binary type is `blob` for the global `WebSocket`
binaryType: this.options.binaryType,
serializer: this.options.serializer,
deserializer: this.options.deserializer,
closeObserver: {
next: () => {
// ATTENTION!
// See https://github.com/ReactiveX/rxjs/blob/master/src/internal/observable/dom/WebSocketSubject.ts#L340
// RxJS socket emits `onComplete` event only if `event.wasClean` is truthy
// and doesn't complete socket subject if it's falsy
this.disconnect();
}
},
openObserver: {
next: () => this.store.dispatch(new WebSocketConnected())
}
};
private _socket: WebSocket | null = null;

private typeKey = this.options.typeKey!;
private readonly _socketClosed$ = new Subject<void>();

private subscription = new Subscription();
private readonly _typeKey = this._options.typeKey!;

private readonly _destroy$ = new Subject<void>();

constructor(
private store: Store,
private actions$: Actions,
@Inject(NGXS_WEBSOCKET_OPTIONS) private options: NgxsWebsocketPluginOptions
private _store: Store,
private _ngZone: NgZone,
private _actions$: Actions,
@Inject(NGXS_WEBSOCKET_OPTIONS) private _options: NgxsWebsocketPluginOptions
) {
this.setupActionsListeners();
this._setupActionsListeners();
}

ngOnDestroy(): void {
this.closeConnection();
this.subscription.unsubscribe();
this._closeConnection();
this._destroy$.next();
}

private setupActionsListeners(): void {
this.subscription.add(
this.actions$.pipe(ofActionDispatched(ConnectWebSocket)).subscribe(({ payload }) => {
private _setupActionsListeners(): void {
this._actions$
.pipe(ofActionDispatched(ConnectWebSocket), takeUntil(this._destroy$))
.subscribe(({ payload }) => {
this.connect(payload);
})
);
});

this.subscription.add(
this.actions$.pipe(ofActionDispatched(DisconnectWebSocket)).subscribe(() => {
this.disconnect();
})
);
this._actions$
.pipe(ofActionDispatched(DisconnectWebSocket), takeUntil(this._destroy$))
.subscribe(() => {
this._disconnect();
});

this.subscription.add(
this.actions$.pipe(ofActionDispatched(SendWebSocketMessage)).subscribe(({ payload }) => {
this._actions$
.pipe(ofActionDispatched(SendWebSocketMessage), takeUntil(this._destroy$))
.subscribe(({ payload }) => {
this.send(payload);
})
);
});
}

private connect(options?: NgxsWebsocketPluginOptions): void {
this.updateConnection();
this._updateConnection();

// Users can pass the options in the connect method so
// if options aren't available at DI bootstrap they have access
// to pass them here
// TODO(arturovt): we should not override default config values because this breaks support for having multiple socket connections.
if (options) {
this.mergeConfigWithOptions(options);
if (options.serializer) {
this._options.serializer = options.serializer;
}

if (options.deserializer) {
this._options.deserializer = options.deserializer;
}
}

this.socket = new WebSocketSubject(this.config);

this.socket.subscribe({
next: (message: any) => {
const type = getValue(message, this.typeKey);
if (!type) {
throw new TypeKeyPropertyMissingError(this.typeKey);
}
this.store.dispatch({ ...message, type });
},
error: (error: any) => {
if (error instanceof CloseEvent) {
this.dispatchWebSocketDisconnected();
} else {
this.store.dispatch(new WebsocketMessageError(error));
}
this._ngZone.runOutsideAngular(() => {
// We either use options provided in the `ConnectWebSocket` action or fallback to default config values.
const url = options?.url || this._options.url!;
const protocol = options?.protocol || this._options.protocol;
const binaryType = options?.binaryType || this._options.binaryType;

const socket = (this._socket = protocol
? new WebSocket(url, protocol)
: new WebSocket(url));

if (binaryType) {
socket.binaryType = binaryType;
}

fromEvent(socket, 'open')
.pipe(takeUntil(this._socketClosed$))
.subscribe(() => this._store.dispatch(new WebSocketConnected()));

fromEvent<MessageEvent>(socket, 'message')
.pipe(takeUntil(this._socketClosed$))
.subscribe(event => {
const message = this._options.deserializer!(event);
const type = getValue(message, this._typeKey);
if (!type) {
throw new TypeKeyPropertyMissingError(this._typeKey);
}
this._store.dispatch({ ...message, type });
});

fromEvent(socket, 'error')
.pipe(takeUntil(this._socketClosed$))
.subscribe(error => this._store.dispatch(new WebsocketMessageError(error)));

fromEvent<CloseEvent>(socket, 'close')
.pipe(takeUntil(this._socketClosed$))
.subscribe(event => {
if (event.wasClean) {
this._disconnect();
} else {
this._store.dispatch(new WebsocketMessageError(event));
}
});
});
}

private disconnect(): void {
if (this.socket) {
this.closeConnection();
this.dispatchWebSocketDisconnected();
private _disconnect(): void {
if (this._socket) {
this._closeConnection();
this._dispatchWebSocketDisconnected();
}
}

private send(data: any): void {
if (!this.socket) {
if (!this._socket) {
throw new Error('You must connect to the socket before sending any data');
}

this.socket.next(data);
}

/**
* Don't enlarge the `connect` method
*/
private mergeConfigWithOptions(options: NgxsWebsocketPluginOptions): void {
if (options.url) {
this.config.url = options.url;
}

if (options.serializer) {
this.config.serializer = options.serializer;
}

if (options.deserializer) {
this.config.deserializer = options.deserializer;
}
this._socket.send(this._options.serializer!(data));
}

/**
Expand All @@ -147,27 +140,20 @@ export class WebSocketHandler implements OnDestroy {
* then the previous subscription will still live in the memory
* to prevent such behavior - we close the previous connection if it exists
*/
private updateConnection(): void {
if (this.socket) {
this.closeConnection();
this.store.dispatch(new WebSocketConnectionUpdated());
private _updateConnection(): void {
if (this._socket) {
this._closeConnection();
this._store.dispatch(new WebSocketConnectionUpdated());
}
}

/**
* Used in many places so it's better to move the code into function
*/
private dispatchWebSocketDisconnected(): void {
this.store.dispatch(new WebSocketDisconnected());
private _dispatchWebSocketDisconnected(): void {
this._store.dispatch(new WebSocketDisconnected());
}

private closeConnection(): void {
// `socket.complete()` closes the connection
// also it doesn't invoke the `onComplete` callback that we passed
// into `socket.subscribe(...)`
if (this.socket !== null) {
this.socket.complete();
this.socket = null;
}
private _closeConnection(): void {
this._socket?.close();
this._socket = null;
this._socketClosed$.next();
}
}
17 changes: 7 additions & 10 deletions packages/websocket-plugin/src/websocket.module.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { NgModule, ModuleWithProviders, APP_INITIALIZER, InjectionToken } from '@angular/core';
import { NgModule, ModuleWithProviders, InjectionToken } from '@angular/core';

import { WebSocketHandler } from './websocket-handler';
import { NgxsWebsocketPluginOptions, NGXS_WEBSOCKET_OPTIONS, noop } from './symbols';
import { NgxsWebsocketPluginOptions, NGXS_WEBSOCKET_OPTIONS } from './symbols';

export function websocketOptionsFactory(options: NgxsWebsocketPluginOptions) {
return {
Expand All @@ -18,17 +18,20 @@ export function websocketOptionsFactory(options: NgxsWebsocketPluginOptions) {
};
}

export const USER_OPTIONS = new InjectionToken('USER_OPTIONS');
const NG_DEV_MODE = typeof ngDevMode === 'undefined' || ngDevMode;

export const USER_OPTIONS = new InjectionToken(NG_DEV_MODE ? 'USER_OPTIONS' : '');

@NgModule()
export class NgxsWebsocketPluginModule {
constructor(_handler: WebSocketHandler) {}

static forRoot(
options?: NgxsWebsocketPluginOptions
): ModuleWithProviders<NgxsWebsocketPluginModule> {
return {
ngModule: NgxsWebsocketPluginModule,
providers: [
WebSocketHandler,
{
provide: USER_OPTIONS,
useValue: options
Expand All @@ -37,12 +40,6 @@ export class NgxsWebsocketPluginModule {
provide: NGXS_WEBSOCKET_OPTIONS,
useFactory: websocketOptionsFactory,
deps: [USER_OPTIONS]
},
{
provide: APP_INITIALIZER,
useFactory: noop,
deps: [WebSocketHandler],
multi: true
}
]
};
Expand Down

0 comments on commit b823b11

Please sign in to comment.