Skip to content
This repository has been archived by the owner on Apr 12, 2024. It is now read-only.

Commit

Permalink
Remove unnecessary macro task scheduling in send (#54)
Browse files Browse the repository at this point in the history
* Remove unnecessary macro task scheduling in send

Signed-off-by: Josh Kim <[email protected]>

* bus.request*() now requires explicit fire()

In an effort to use as few `setTimeout()` as possible
a small tradeoff was made to bus.request*() wherein once MessageHandler
instance is created, you will need to call its `fire()` method to set
off the action.

Signed-off-by: Josh Kim <[email protected]>

* Enable zone bypass by configuration for performance

Signed-off-by: Josh Kim <[email protected]>

* Revert bus.request* changes as no longer necessary

Signed-off-by: Josh Kim <[email protected]>
  • Loading branch information
jooskim authored Aug 25, 2021
1 parent 93a03e0 commit a49ff42
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 70 deletions.
12 changes: 11 additions & 1 deletion build/npm/transport.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@
"license": "BSD-2-Clause",
"changelogHistory": [
{
"date": "8/24/21",
"version": "1.3.1",
"notes": [
{
"description": "Remove unnecessary macro task scheduling",
"review_uri": "https://github.com/vmware/transport-typescript/pull/54"
}
]
}
,{
"date": "8/20/21",
"version": "1.3.0",
"notes": [
Expand Down Expand Up @@ -1097,4 +1107,4 @@
]
}
]
}
}
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@vmw/transport",
"version": "1.3.0",
"version": "1.3.1",
"private": false,
"license": "BSD-2-Clause",
"repository": "[email protected]:vmware/transport-typescript.git",
Expand Down Expand Up @@ -66,4 +66,4 @@
"resolutions": {
"glob-parent": "^5.1.2"
}
}
}
10 changes: 5 additions & 5 deletions src/bridge/broker-connector.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ describe('BrokerConnector [broker-connector.ts]', () => {
session: session.id,
isQueue: false,
brokerPrefix: 'fake'
};
};
let headers = session.connect.calls.mostRecent().args[0];
let headerKeys = Object.keys(headers);
expect(globalHeaderKeys.every(key => headerKeys.includes(key))).toBeTruthy();
Expand All @@ -108,13 +108,13 @@ describe('BrokerConnector [broker-connector.ts]', () => {
bc.disconnectClient(session.id);
headers = session.disconnect.calls.mostRecent().args[0];
headerKeys = Object.keys(headers);
expect(globalHeaderKeys.every(key => headerKeys.includes(key))).toBeTruthy();
expect(globalHeaderKeys.every(key => headerKeys.includes(key))).toBeTruthy();

done();
},
10 // found by trial and error.
);
});
});
});

describe('Service configuration and basic connect/disconnect', () => {
Expand Down Expand Up @@ -1210,9 +1210,9 @@ describe('BrokerConnector [broker-connector.ts]', () => {
(done) => {
let count = 0;
/*
This tests that galactic channels operate over low level API's
*/

const chan = bus.api.getGalacticChannel(
Expand Down
13 changes: 12 additions & 1 deletion src/bus.api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ import { GeneralUtil } from './util/util';
import { FabricApi } from './fabric.api';
import { BrokerConnector } from './bridge';

// used to accept NgZone instance from Angular to schedule tasks outside of Zone, enabling efficient change detection
export declare type NgZoneRef = any;

// current version
const version = '1.3.0';
const version = '1.3.1';

export type ChannelName = string;
export type SentFrom = string;
Expand Down Expand Up @@ -160,6 +163,11 @@ export abstract class EventBus {
*/
readonly brokerConnector: BrokerConnector;

/**
* Reference to ngZone.
*/
public zoneRef: NgZoneRef;

/**
* Simple API Methods
*/
Expand Down Expand Up @@ -549,6 +557,9 @@ export abstract class EventBus {

/** Enable Fake Socket for broker connector */
abstract enableDevMode(): void;

/** Set NgZone reference for efficient macro task scheduling in Angular apps */
abstract setNgZoneRef(ngZoneRef: NgZoneRef): void;
}

/**
Expand Down
38 changes: 24 additions & 14 deletions src/bus/bus.lowlevel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,17 +173,13 @@ export class EventBusLowLevelApiImpl implements EventBusLowApi {
}

sendRequest(cname: string, payload: any, name?: string): void {
let mh: MessageHandlerConfig = new MessageHandlerConfig(cname, payload, true, cname);
const mh: MessageHandlerConfig = new MessageHandlerConfig(cname, payload, true, cname);
this.send(mh.sendChannel, new Message().request(mh), name);
}

sendResponse(cname: string, payload: any, name?: string): void {
let mh: MessageHandlerConfig = new MessageHandlerConfig(cname, payload, true, cname);
this.tickEventLoop(
() => {
this.send(mh.sendChannel, new Message().response(mh), name);
}
);
const mh: MessageHandlerConfig = new MessageHandlerConfig(cname, payload, true, cname);
this.send(mh.sendChannel, new Message().response(mh), name);
}

complete(cname: ChannelName, from?: SentFrom): boolean {
Expand Down Expand Up @@ -289,17 +285,30 @@ export class EventBusLowLevelApiImpl implements EventBusLowApi {

send(cname: ChannelName, message: Message, from?: SentFrom): boolean {
message.sender = from; // make sure we know where this message came from.
let mo = new MonitorObject;
if (!this.internalChannelMap.has(cname)) {
mo = new MonitorObject().build(MonitorType.MonitorDropped, cname, from, message);
const channelFound = this.internalChannelMap.has(cname);
let mo = new MonitorObject().build(
channelFound ? MonitorType.MonitorData : MonitorType.MonitorDropped,
cname,
from,
message
);

if (!channelFound) {
this.monitorStream.send(new Message().request(mo));
return false;
}

mo = new MonitorObject().build(MonitorType.MonitorData, cname, from, message);
this.monitorStream.send(new Message().request(mo));
this.internalChannelMap.get(cname)
.send(message);

if (this.eventBusRef.zoneRef) {
this.eventBusRef.zoneRef.runOutsideAngular(() => {
this.internalChannelMap.get(cname).send(message);
this.monitorStream.send(new Message().request(mo));
});
} else {
this.monitorStream.send(new Message().request(mo));
this.internalChannelMap.get(cname).send(message);
}

return true;
}

Expand All @@ -325,6 +334,7 @@ export class EventBusLowLevelApiImpl implements EventBusLowApi {
const handler: MessageHandler<R, E> = this.createMessageHandler(handlerConfig, false, name, id);
this.send(handlerConfig.sendChannel, new Message(id).request(handlerConfig), name);
return handler;

}

respond<R, E = any>(handlerConfig: MessageHandlerConfig, name?: SentFrom): MessageResponder<R, E> {
Expand Down
31 changes: 15 additions & 16 deletions src/bus/bus.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ describe('TransportEventBus [bus/bus.ts]', () => {
() => bus.closeChannel('puppers')
, 5);

// should have settled
// should have settled
bus.api.tickEventLoop(
() => {

Expand All @@ -367,7 +367,7 @@ describe('TransportEventBus [bus/bus.ts]', () => {

bus.sendErrorMessage('puppers', 'why are my shoes ruined?');

// should have settled
// should have settled
bus.api.tickEventLoop(
() => {
expect(bus.api.countListeners()).toEqual(3);
Expand All @@ -389,7 +389,7 @@ describe('TransportEventBus [bus/bus.ts]', () => {

bus.sendErrorMessage('puppers', 'why are my shoes ruined?');

// should have settled
// should have settled
bus.api.tickEventLoop(
() => {
expect(bus.api.countListeners()).toEqual(3);
Expand Down Expand Up @@ -437,7 +437,7 @@ describe('TransportEventBus [bus/bus.ts]', () => {

bus.sendRequestMessage('puppers', 'how many bones has fox hidden?');

// should have settled
// should have settled
bus.api.tickEventLoop(
() => {
responder.tick('ignore me');
Expand All @@ -449,7 +449,7 @@ describe('TransportEventBus [bus/bus.ts]', () => {
, 10);


// should have settled
// should have settled
bus.api.tickEventLoop(
() => {
expect(count).toEqual(1); // only a single event should have made it through
Expand Down Expand Up @@ -494,7 +494,7 @@ describe('TransportEventBus [bus/bus.ts]', () => {

bus.sendRequestMessage('puppers', 'how many bones has fox hidden?');

// should have settled
// should have settled
bus.api.tickEventLoop(
() => {
handler.tick('ignore me');
Expand All @@ -506,7 +506,7 @@ describe('TransportEventBus [bus/bus.ts]', () => {
, 10);


// should have settled
// should have settled
bus.api.tickEventLoop(
() => {
expect(count).toEqual(1); // only a single event should have made it through
Expand Down Expand Up @@ -834,7 +834,6 @@ describe('TransportEventBus [bus/bus.ts]', () => {
}
}
);

}
);

Expand Down Expand Up @@ -1478,7 +1477,7 @@ describe('TransportEventBus [bus/bus.ts]', () => {
expect(req).toEqual('where is my dinner?');
counter++;
if (counter === 3) {
responder.close(); // stop responding, but keep handling.
responder.close(); // stop responding, but keep handling.
}
return 'coming soon, calm down pup!';
}
Expand Down Expand Up @@ -1990,18 +1989,18 @@ describe('TransportEventBus [bus/bus.ts]', () => {
bus.listenGalacticStream('space-dogs', null, {brokerIdentity: 'connString', isPrivate: false});
bus.sendGalacticMessage('space-dogs', 'off to the moon goes fox!');
});

describe('markChannelAsGalactic()', () => {
const channelName = 'space-cats';

beforeEach(() => {
beforeEach(() => {
bus.markChannelAsGalactic(channelName, 'connString');
});

it('sets the channel to be galactic', () => {
const channel: Channel = bus.api.getChannelObject(channelName);

expect(channel.galactic).toEqual(true);
expect(channel.galactic).toEqual(true);
});

it('sends MonitorNewGalacticChannel message', (done) => {
Expand All @@ -2020,7 +2019,7 @@ describe('TransportEventBus [bus/bus.ts]', () => {
}
}
);
});
});
});

it('markChannelsAsGalactic triggers markChannelAsGalactic for each channel names', () => {
Expand All @@ -2042,14 +2041,14 @@ describe('TransportEventBus [bus/bus.ts]', () => {
describe('markChannelAsLocal()', () => {
const channelName = 'space-cats';

beforeEach(() => {
beforeEach(() => {
bus.markChannelAsLocal(channelName);
});

it('sets the channel to be private', () => {
bus.markChannelAsLocal(channelName);
const channel: Channel = bus.api.getChannelObject(channelName);

expect(channel.galactic).toEqual(false);
});

Expand All @@ -2069,7 +2068,7 @@ describe('TransportEventBus [bus/bus.ts]', () => {
}
}
);

bus.markChannelAsLocal(channelName);
});
});
Expand Down
Loading

0 comments on commit a49ff42

Please sign in to comment.