Skip to content

Commit

Permalink
Implement Go Context in ts-nitro (#105)
Browse files Browse the repository at this point in the history
* Implement Context

* Use Context in Node and Engine

* Handle review comments

---------

Co-authored-by: neeraj <[email protected]>
  • Loading branch information
nikugogoi and neerajvijay1997 authored Aug 16, 2023
1 parent bac09a5 commit 8fd5e9f
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import { WaitGroup } from '@jpwilliams/waitgroup';
import type { ReadChannel, ReadWriteChannel } from '@cerc-io/ts-channel';
import type { Log } from '@ethersproject/abstract-provider';
import Channel from '@cerc-io/ts-channel';
import { EthClient, go, hex2Bytes } from '@cerc-io/nitro-util';
import {
EthClient, go, hex2Bytes, Context,
} from '@cerc-io/nitro-util';

import {
ChainService, ChainEvent, DepositedEvent, ConcludedEvent, AllocationUpdatedEvent,
Expand Down Expand Up @@ -68,9 +70,9 @@ export class EthChainService implements ChainService {

private logger: debug.Debugger;

private ctx: AbortController;
private ctx: Context;

private cancel: (reason ?: any) => void;
private cancel: () => void;

private wg?: WaitGroup;

Expand All @@ -83,7 +85,7 @@ export class EthChainService implements ChainService {
txSigner: ethers.Signer,
out: ReadWriteChannel<ChainEvent>,
logger: debug.Debugger,
ctx: AbortController,
ctx: Context,
cancel: () => void,
wg: WaitGroup,
) {
Expand Down Expand Up @@ -150,8 +152,8 @@ export class EthChainService implements ChainService {
txSigner: ethers.Signer,
logDestination?: WritableStream,
): EthChainService {
const ctx = new AbortController();
const cancelCtx = ctx.abort.bind(ctx);
const ctx = new Context();
const cancelCtx = ctx.withCancel();

const out = Channel<ChainEvent>(10);

Expand Down Expand Up @@ -182,21 +184,15 @@ export class EthChainService implements ChainService {

// listenForErrors listens for errors on the error channel and attempts to handle them if they occur.
// TODO: Currently "handle" is panicking
private async listenForErrors(ctx: AbortController, errChan: ReadChannel<Error>): Promise<void> {
// Channel to implement ctx.Done()
const ctxDone = Channel();
this.ctx.signal.addEventListener('abort', (event) => {
ctxDone.close();
});

private async listenForErrors(ctx: Context, errChan: ReadChannel<Error>): Promise<void> {
/* eslint-disable no-await-in-loop */
/* eslint-disable default-case */
while (true) {
switch (await Channel.select([
ctxDone.shift(),
this.ctx.done.shift(),
errChan.shift(),
])) {
case ctxDone: {
case this.ctx.done: {
this.wg!.done();
return;
}
Expand Down Expand Up @@ -361,24 +357,18 @@ export class EthChainService implements ChainService {
query: ethers.providers.EventType,
listener: (eventLog: Log) => void,
) {
// Channel to implement ctx.Done()
const ctxDone = Channel();
this.ctx.signal.addEventListener('abort', (event) => {
ctxDone.close();
});

/* eslint-disable no-restricted-syntax */
/* eslint-disable no-labels */
out:
while (true) {
/* eslint-disable no-await-in-loop */
/* eslint-disable default-case */
switch (await Channel.select([
ctxDone.shift(),
this.ctx.done.shift(),
subErr.shift(),
logs.shift(),
])) {
case ctxDone: {
case this.ctx.done: {
subUnsubscribe();
this.wg!.done();
return;
Expand Down
18 changes: 6 additions & 12 deletions packages/nitro-node/src/node/engine/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { WaitGroup } from '@jpwilliams/waitgroup';

import Channel from '@cerc-io/ts-channel';
import type { ReadChannel, ReadWriteChannel } from '@cerc-io/ts-channel';
import { JSONbigNative, go } from '@cerc-io/nitro-util';
import { JSONbigNative, go, Context } from '@cerc-io/nitro-util';

import { MessageService } from './messageservice/messageservice';
import { ChainService, ChainEvent, ChainEventHandler } from './chainservice/chainservice';
Expand Down Expand Up @@ -217,8 +217,8 @@ export class Engine {

e.wg = new WaitGroup();

const ctx = new AbortController();
e.cancel = ctx.abort.bind(ctx);
const ctx = new Context();
e.cancel = ctx.withCancel();

e.wg.add(1);
go(e.run.bind(e), ctx);
Expand Down Expand Up @@ -255,20 +255,14 @@ export class Engine {

// run kicks of an infinite loop that waits for communications on the supplied channels, and handles them accordingly
// The loop exits when the context is cancelled.
async run(ctx: AbortController): Promise<void> {
async run(ctx: Context): Promise<void> {
assert(this.objectiveRequestsFromAPI);
assert(this.paymentRequestsFromAPI);
assert(this.fromChain);
assert(this.fromMsg);
assert(this.fromLedger);
assert(this._toApi);

// Channel to implement ctx.Done()
const ctxDone = Channel();
ctx.signal.addEventListener('abort', (event) => {
ctxDone.close();
});

while (true) {
let res = new EngineEvent();
let err: Error | null = null;
Expand All @@ -287,7 +281,7 @@ export class Engine {
this.fromChain.shift(),
this.fromMsg.shift(),
this.fromLedger.shift(),
ctxDone.shift(),
ctx.done.shift(),
])) {
case this.objectiveRequestsFromAPI:
[res, err] = await this.handleObjectiveRequest(this.objectiveRequestsFromAPI.value());
Expand All @@ -309,7 +303,7 @@ export class Engine {
[res, err] = await this.handleProposal(this.fromLedger.value());
break;

case ctxDone: {
case ctx.done: {
this.wg!.done();
return;
}
Expand Down
18 changes: 7 additions & 11 deletions packages/nitro-node/src/node/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { WaitGroup } from '@jpwilliams/waitgroup';

import type { ReadChannel, ReadWriteChannel } from '@cerc-io/ts-channel';
import Channel from '@cerc-io/ts-channel';
import { go, randUint64 } from '@cerc-io/nitro-util';
import { go, randUint64, Context } from '@cerc-io/nitro-util';

import { MessageService } from './engine/messageservice/messageservice';
import { ChainService } from './engine/chainservice/chainservice';
Expand Down Expand Up @@ -105,8 +105,8 @@ export class Node {

c.channelNotifier = ChannelNotifier.newChannelNotifier(store, c.vm);

const ctx = new AbortController();
c.cancelEventHandler = ctx.abort.bind(ctx);
const ctx = new Context();
c.cancelEventHandler = ctx.withCancel();

c.wg = new WaitGroup();
c.wg.add(1);
Expand Down Expand Up @@ -204,20 +204,16 @@ export class Node {
}

// handleEngineEvents is responsible for monitoring the ToApi channel on the engine.
// It parses events from the ToApi chan and then dispatches events to the necessary node chan.
private async handleEngineEvents(ctx: AbortController) {
// Channel to implement ctx.Done()
const ctxDone = Channel();
ctx.signal.onabort = () => { ctxDone.close(); };

// It parses events from the ToApi chan and then dispatches events to the necessary client chan.
private async handleEngineEvents(ctx: Context) {
/* eslint-disable no-await-in-loop */
/* eslint-disable default-case */
while (true) {
switch (await Channel.select([
ctxDone.shift(),
ctx.done.shift(),
this.engine.toApi.shift(),
])) {
case ctxDone: {
case ctx.done: {
this.wg!.done();
return;
}
Expand Down
1 change: 1 addition & 0 deletions packages/nitro-util/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
},
"dependencies": {
"@statechannels/nitro-protocol": "^2.0.0-alpha.4",
"@cerc-io/ts-channel": "1.0.3-ts-nitro-0.1.1",
"assert": "^2.0.0",
"debug": "^4.3.4",
"ethers": "^5.7.2",
Expand Down
23 changes: 23 additions & 0 deletions packages/nitro-util/src/context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import type { ReadWriteChannel } from '@cerc-io/ts-channel';
import Channel from '@cerc-io/ts-channel';

export class Context {
ctx: AbortController;

done: ReadWriteChannel<unknown>;

constructor() {
this.ctx = new AbortController();
this.done = Channel();

this.ctx.signal.addEventListener('abort', () => {
this.done.close();
});
}

withCancel(): () => void {
return () => {
this.ctx.abort();
};
}
}
1 change: 1 addition & 0 deletions packages/nitro-util/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export * from './contract-bindings';
export * from './types';
export * from './constants';
export * from './deploy-contracts';
export * from './context';

export {
INitroTypes, ExitFormat, DepositedEventObject, AllocationUpdatedEventObject, ConcludedEventObject,
Expand Down

0 comments on commit 8fd5e9f

Please sign in to comment.