Skip to content

Commit

Permalink
refactor: combine channel and exec call in gRPC
Browse files Browse the repository at this point in the history
  • Loading branch information
jenspots committed Jul 29, 2024
1 parent 4ed7c48 commit 5f240d4
Show file tree
Hide file tree
Showing 15 changed files with 541 additions and 308 deletions.
3 changes: 1 addition & 2 deletions proto/index.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,5 @@ import "intermediate.proto";

service Runner {
rpc load(IRStage) returns (Empty);
rpc exec(Empty) returns (Empty);
rpc channel(stream ChannelMessage) returns (stream ChannelMessage);
rpc exec(stream ChannelMessage) returns (stream ChannelMessage);
}
12 changes: 6 additions & 6 deletions runners/nodejs/src/interfaces/callback_channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ export class CallbackChannel<T> implements Writer<T> {
* Handle incoming messages of the channel by piping them into the callback.
* @private
*/
private handler = new Promise(async () => {
private handler = (async () => {
for await (const data of this.channel) {
await this.onWrite(data)
await this.onWrite(data);
}

if (this.onClose != null) {
await this.onClose()
await this.onClose();
}
});
})();

/**
* Create a new callback channel with a specific callback.
Expand All @@ -56,7 +56,7 @@ export class CallbackChannel<T> implements Writer<T> {
}

close(): Promise<void> {
this.channel.close()
this.channel.close();
return Promise.resolve();
}

Expand All @@ -65,6 +65,6 @@ export class CallbackChannel<T> implements Writer<T> {
}

write(data: T) {
this.channel.write(data)
this.channel.write(data);
}
}
12 changes: 7 additions & 5 deletions runners/nodejs/src/interfaces/channel.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { RunnerError } from "../error";
import { Reader } from "./reader";
import { Writer } from "./writer";
import {Log} from "./log";

/**
* A channel is a communication mechanism that allows for the transfer of values
Expand All @@ -20,7 +19,10 @@ export class Channel<T> implements Reader<T>, Writer<T> {
* Outstanding reads that are waiting for a value to be written to the channel.
* @private
*/
private readonly reads: Array<{ resolve: (value: T) => void, reject: (reason: any) => void }> = [];
private readonly reads: Array<{
resolve: (value: T) => void;
reject: (reason: unknown) => void;
}> = [];

/**
* Whether the channel has been closed or not.
Expand Down Expand Up @@ -73,7 +75,7 @@ export class Channel<T> implements Reader<T>, Writer<T> {
this.closed = true;

for (const promise of this.reads) {
promise.reject(RunnerError.channelError())
promise.reject(RunnerError.channelError());
}
}

Expand All @@ -91,10 +93,10 @@ export class Channel<T> implements Reader<T>, Writer<T> {
async *[Symbol.asyncIterator]() {
while (true) {
try {
yield await this.read()
yield await this.read();
} catch (e) {
if (this.closed && this.values.length == 0) {
break
break;
}
}
}
Expand Down
20 changes: 11 additions & 9 deletions runners/nodejs/src/interfaces/log.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { RunnerError } from "../error";

const TIME_PADDING = 15
const LEVEL_PADDING = 10
const FILE_PADDING = 39
const MESSAGE_PADDING = 87
const TIME_PADDING = 15;
const LEVEL_PADDING = 10;
const FILE_PADDING = 39;

enum LogLevel {
DEBUG,
Expand Down Expand Up @@ -64,12 +63,15 @@ export class Log {
* @private
*/
private push(value: { level: LogLevel; message: string }): void {
const time = new Date().toISOString().slice(11, 22).padEnd(TIME_PADDING, ' ')
const level = LogLevel[value.level].padEnd(LEVEL_PADDING, ' ')
const caller = this.getCaller().padEnd(FILE_PADDING, ' ')
const message = value.message
const time = new Date()
.toISOString()
.slice(11, 22)
.padEnd(TIME_PADDING, " ");
const level = LogLevel[value.level].padEnd(LEVEL_PADDING, " ");
const caller = this.getCaller().padEnd(FILE_PADDING, " ");
const message = value.message;

console.log(`${time}${level}${caller}${message}`)
console.log(`${time}${level}${caller}${message}`);
}

/**
Expand Down
92 changes: 66 additions & 26 deletions runners/nodejs/src/proto/channel.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 25 additions & 11 deletions runners/nodejs/src/proto/empty.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 5f240d4

Please sign in to comment.