Skip to content
This repository has been archived by the owner on Apr 12, 2024. It is now read-only.

Commit

Permalink
Ensure messages are picked up by Angular change detection (#57)
Browse files Browse the repository at this point in the history
* Ensure messages are picked up by Angular change detection

Transport 1.3.1 introduced a bug when cutting down the number of
`setTimeout()` invocations where Angular applications would not
pick up the changes even after messages have been received, unless
the send operations were scheduled as a macro task. This PR fixes
the issue so that messages arriving in a bus channel would always
be picked up by the Angular change detection system no matter how
the message was sent.

Signed-off-by: Josh Kim <[email protected]>

* Address comments

Signed-off-by: Josh Kim <[email protected]>

* Unit tests to cover new methods

Signed-off-by: Josh Kim <[email protected]>
  • Loading branch information
jooskim committed Aug 26, 2021
1 parent fb990d8 commit 1cb3251
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 7 deletions.
14 changes: 12 additions & 2 deletions build/npm/transport.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@
],
"license": "BSD-2-Clause",
"changelogHistory": [
{
"date": "8/25/21",
"version": "1.3.2",
"notes": [
{
"description": "Ensure messages are picked up by Angular change detection",
"review_uri": "https://github.com/vmware/transport-typescript/pull/55"
}
]
},
{
"date": "8/24/21",
"version": "1.3.1",
Expand All @@ -37,8 +47,8 @@
"review_uri": "https://github.com/vmware/transport-typescript/pull/54"
}
]
}
,{
},
{
"date": "8/20/21",
"version": "1.3.0",
"notes": [
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@vmw/transport",
"version": "1.3.1",
"version": "1.3.2",
"private": false,
"license": "BSD-2-Clause",
"repository": "[email protected]:vmware/transport-typescript.git",
Expand Down
17 changes: 16 additions & 1 deletion src/bus.api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { BrokerConnector } from './bridge';
export declare type NgZoneRef = any;

// current version
const version = '1.3.1';
const version = '1.3.2';

export type ChannelName = string;
export type SentFrom = string;
Expand Down Expand Up @@ -560,6 +560,11 @@ export abstract class EventBus {

/** Set NgZone reference for efficient macro task scheduling in Angular apps */
abstract setNgZoneRef(ngZoneRef: NgZoneRef): void;

/**
* Perform any cleanups to free up any resources for garbage collection
*/
abstract destroy(): void;
}

/**
Expand All @@ -575,6 +580,11 @@ export interface EventBusLowApi {
*/
readonly channelMap: Map<ChannelName, Channel>;

/**
* Subscription to Subject that schedules Angular change detection.
*/
ngViewRefreshSubscription?: Subscription;

/**
* A new channel is created by the first reference to it. All subsequent references to that channel are handed
* the same stream to subscribe to. Accessing this method increments the channels reference count.
Expand Down Expand Up @@ -847,6 +857,11 @@ export interface EventBusLowApi {
*/
getId(): UUID;

/**
* Set up Subject and subscribe to it that will fire as messages are emitted to trigger Angular change detection.
* As the name implies, should only be used when in Angular apps.
*/
setUpNgViewRefreshScheduler(): void;
}


Expand Down
24 changes: 22 additions & 2 deletions src/bus/bus.lowlevel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,31 @@ import {
MessageHandlerConfig
} from './model/message.model';
import { Observable, Subject, Subscription, merge } from 'rxjs';
import { filter, map } from 'rxjs/operators';
import { debounceTime, filter, map } from 'rxjs/operators';
import { Logger } from '../log/logger.service';
import { LogLevel } from '../log/logger.model';
import { MonitorChannel, MonitorObject, MonitorType } from './model/monitor.model';
import { LogUtil } from '../log/util';
import { UUID } from './store/store.model';
import { GeneralUtil } from '../util/util';

// interval to debounce the scheduling of angular change detection with
const NGZONE_TRIGGER_DEBOUNCE_THRESHOLD = 10;

export class EventBusLowLevelApiImpl implements EventBusLowApi {

readonly channelMap: Map<ChannelName, Channel>;

private log: Logger;
private monitorChannel = MonitorChannel.stream;
private monitorStream: Channel;
private dumpMonitor: boolean;
private internalChannelMap: Map<string, Channel>;
public loggerInstance: Logger;
private id: UUID;
private ngViewRefreshSubject?: Subject<void>;

public ngViewRefreshSubscription?: Subscription;
public loggerInstance: Logger;

public getId(): UUID {
return this.id;
Expand Down Expand Up @@ -303,6 +310,7 @@ export class EventBusLowLevelApiImpl implements EventBusLowApi {
this.eventBusRef.zoneRef.runOutsideAngular(() => {
this.internalChannelMap.get(cname).send(message);
this.monitorStream.send(new Message().request(mo));
this.ngViewRefreshSubject?.next();
});
} else {
this.monitorStream.send(new Message().request(mo));
Expand Down Expand Up @@ -347,6 +355,18 @@ export class EventBusLowLevelApiImpl implements EventBusLowApi {
return this.createMessageHandler(handlerConfig, requestStream, name, id);
}

setUpNgViewRefreshScheduler(): void {
// kill any existing subscription
this.ngViewRefreshSubscription?.unsubscribe();

// create a Subject that triggers Angular change detection when a new message arrives. to prevent
// suffocating CPU resource from too many cycles, debounce the incoming messages by NGZONE_TRIGGER_DEBOUNCE_THRESHOLD milliseconds.
this.ngViewRefreshSubject = new Subject();
this.ngViewRefreshSubscription = this.ngViewRefreshSubject
.pipe(debounceTime(NGZONE_TRIGGER_DEBOUNCE_THRESHOLD))
.subscribe(() => this.eventBusRef.zoneRef.run(() => {}));
}

/**
* PRIVATE METHODS.
*/
Expand Down
99 changes: 99 additions & 0 deletions src/bus/bus.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { GeneralUtil } from '../util/util';
import { MessageHandler, MessageResponder, MessageType } from '../bus.api';
import { BridgeConnectionAdvancedConfig, StompConfig } from '../bridge';
import { StompBusCommand } from '../bridge/stomp.model';
import { NgZoneRef } from '.';

function makeCallCountCaller(done: any, targetCount: number): any {
let count = 0;
Expand Down Expand Up @@ -2119,4 +2120,102 @@ describe('TransportEventBus [bus/bus.ts]', () => {
}
);
});

describe('NgZone support tests', () => {
it('ngZoneRef should be set', () => {
const ngZoneRef: NgZoneRef = {};

// pass NgZone to bus
bus.setNgZoneRef(ngZoneRef);

expect(bus.zoneRef).not.toBeUndefined();
});

it('Angular change detection is invoked when ngZoneRef is present', (done) => {
let changeDetectionCount = 0;

const ngZoneRef: NgZoneRef = {
run(fn: any): void {
changeDetectionCount++;
expect(fn instanceof Function).toBeTruthy();
expect(changeDetectionCount).toBe(1);
done();
},
runOutsideAngular(fn: Function): void {
fn();
}
};

// set up a channel
bus.api.getChannel(testChannel);

// pass NgZone to bus
bus.setNgZoneRef(ngZoneRef);

// send message
bus.sendResponseMessage(testChannel, 'testing');
});

it('Angular change detection is debounced if messages arrive rapidly', (done) => {
let changeDetectionCount = 0;
let sentMessagesCount = 0;

const ngZoneRef: NgZoneRef = {
run(fn: any): void {
changeDetectionCount++;
expect(changeDetectionCount).toBe(1);
expect(sentMessagesCount).toBe(100);
done();
},
runOutsideAngular(fn: Function): void {
sentMessagesCount++;
fn();
}
};

// set up a channel
bus.api.getChannel(testChannel);

// pass NgZone to bus
bus.setNgZoneRef(ngZoneRef);

// send messages
for (let i = 0; i < 100; i++) {
bus.sendResponseMessage(testChannel, 'testing');
}
});

it('Angular change detection is not invoked when ngZoneRef is not set', (done) => {
bus.listenOnce(testChannel)
.handle(() => {
expect(bus.api.ngViewRefreshSubscription).toBeUndefined();
done();
});

// set up a channel
bus.api.getChannel(testChannel);

// send message
bus.sendResponseMessage(testChannel, 'testing');
});

it('Subscription is properly disposed of when bus is destroyed', () => {
const ngZoneRef: NgZoneRef = {
run(fn: any): void {},
runOutsideAngular(fn: Function): void {}
};

// pass NgZone to bus
bus.setNgZoneRef(ngZoneRef);

// subscription is alive
expect(bus.api.ngViewRefreshSubscription.closed).toBeFalsy();

// clean up any tied up resources
bus.destroy();

// ngViewRefreshSubscription should be closed
expect(bus.api.ngViewRefreshSubscription.closed).toBeTruthy();
});
});
});
10 changes: 9 additions & 1 deletion src/bus/bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import { FabricApi } from '../fabric.api';
import { FabricApiImpl } from '../fabric/fabric';
import { NgZoneRef } from '.';


export class TransportEventBus extends EventBus implements EventBusEnabled {

private static instance: EventBus;
Expand All @@ -46,6 +45,7 @@ export class TransportEventBus extends EventBus implements EventBusEnabled {
* Destroy the bus completely.
*/
public static destroy(): void {
this.instance.destroy();
this.instance = null;
}

Expand Down Expand Up @@ -165,6 +165,14 @@ export class TransportEventBus extends EventBus implements EventBusEnabled {

public setNgZoneRef(ngZoneRef: NgZoneRef): void {
this.zoneRef = ngZoneRef;
this.api.setUpNgViewRefreshScheduler();
}

public destroy(): void {
// unsubscribe from Angular change detection scheduler
this.api.ngViewRefreshSubscription?.unsubscribe();

// other future cleanup code goes in here
}

public enableDevMode(): void {
Expand Down

0 comments on commit 1cb3251

Please sign in to comment.