Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: peer renewal connection drop & stream management #2145

Merged
merged 13 commits into from
Oct 1, 2024
1 change: 1 addition & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
}
],
"@typescript-eslint/explicit-member-accessibility": "error",
"@typescript-eslint/no-unused-vars": ["warn", { "argsIgnorePattern": "^_" }],
"prettier/prettier": [
"error",
{
Expand Down
3 changes: 2 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
"@types/uuid": "^9.0.8",
"@waku/build-utils": "*",
"chai": "^4.3.10",
"sinon": "^18.0.0",
"cspell": "^8.6.1",
"fast-check": "^3.19.0",
"ignore-loader": "^0.1.2",
Expand Down
161 changes: 161 additions & 0 deletions packages/core/src/lib/stream_manager/stream_manager.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import { Connection, Peer, PeerId, Stream } from "@libp2p/interface";
import { expect } from "chai";
import sinon from "sinon";

import { StreamManager } from "./stream_manager.js";

const MULTICODEC = "/test";

describe("StreamManager", () => {
let eventTarget: EventTarget;
let streamManager: StreamManager;

const mockPeer: Peer = {
id: {
toString() {
return "1";
}
}
} as unknown as Peer;

beforeEach(() => {
eventTarget = new EventTarget();
streamManager = new StreamManager(
MULTICODEC,
() => [],
eventTarget.addEventListener.bind(eventTarget)
);
});

it("should return usable stream attached to connection", async () => {
for (const writeStatus of ["ready", "writing"]) {
const con1 = createMockConnection();
con1.streams = [
createMockStream({ id: "1", protocol: MULTICODEC, writeStatus })
];

streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1];

const stream = await streamManager.getStream(mockPeer);

expect(stream).not.to.be.undefined;
expect(stream?.id).to.be.eq("1");
}
});

it("should throw if no connection provided", async () => {
streamManager["getConnections"] = (_peerId: PeerId | undefined) => [];

let error: Error | undefined;
try {
await streamManager.getStream(mockPeer);
} catch (e) {
error = e as Error;
}

expect(error).not.to.be.undefined;
expect(error?.message).to.include(mockPeer.id.toString());
expect(error?.message).to.include(MULTICODEC);
});

it("should create a new stream if no existing for protocol found", async () => {
for (const writeStatus of ["done", "closed", "closing"]) {
const con1 = createMockConnection();
con1.streams = [
createMockStream({ id: "1", protocol: MULTICODEC, writeStatus })
];

const newStreamSpy = sinon.spy(async (_protocol, _options) =>
createMockStream({
id: "2",
protocol: MULTICODEC,
writeStatus: "writable"
})
);

con1.newStream = newStreamSpy;
streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1];

const stream = await streamManager.getStream(mockPeer);

expect(stream).not.to.be.undefined;
expect(stream?.id).to.be.eq("2");

expect(newStreamSpy.calledOnce).to.be.true;
expect(newStreamSpy.calledWith(MULTICODEC)).to.be.true;
}
});

it("peer:update - should do nothing if another protocol hit", async () => {
const scheduleNewStreamSpy = sinon.spy();
streamManager["scheduleNewStream"] = scheduleNewStreamSpy;
eventTarget.dispatchEvent(
new CustomEvent("peer:update", { detail: { peer: { protocols: [] } } })
);

expect(scheduleNewStreamSpy.calledOnce).to.be.false;
});

it("peer:update - should schedule stream creation IF protocol hit AND no stream found on connection", async () => {
const scheduleNewStreamSpy = sinon.spy();
streamManager["scheduleNewStream"] = scheduleNewStreamSpy;
eventTarget.dispatchEvent(
new CustomEvent("peer:update", {
detail: { peer: { protocols: [MULTICODEC] } }
})
);

expect(scheduleNewStreamSpy.calledOnce).to.be.true;
});

it("peer:update - should not schedule stream creation IF protocol hit AND stream found on connection", async () => {
const con1 = createMockConnection();
con1.streams = [
createMockStream({
id: "1",
protocol: MULTICODEC,
writeStatus: "writable"
})
];
streamManager["getConnections"] = (_id) => [con1];

const scheduleNewStreamSpy = sinon.spy();
streamManager["scheduleNewStream"] = scheduleNewStreamSpy;

eventTarget.dispatchEvent(
new CustomEvent("peer:update", {
detail: { peer: { protocols: [MULTICODEC] } }
})
);

expect(scheduleNewStreamSpy.calledOnce).to.be.false;
});
});

type MockConnectionOptions = {
status?: string;
open?: number;
};

function createMockConnection(options: MockConnectionOptions = {}): Connection {
return {
status: options.status || "open",
timeline: {
open: options.open || 1
}
} as Connection;
}

type MockStreamOptions = {
id?: string;
protocol?: string;
writeStatus?: string;
};

function createMockStream(options: MockStreamOptions): Stream {
return {
id: options.id,
protocol: options.protocol,
writeStatus: options.writeStatus || "ready"
} as Stream;
}
Loading
Loading