diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index 94be6647de..2bbdfc0d30 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -7,7 +7,6 @@ export { export { utf8ToBytes, bytesToUtf8 } from "@waku/utils/bytes"; -export * from "./utils/content_topic.js"; export * from "./waku.js"; export { createLightNode, defaultLibp2p } from "./create/index.js"; diff --git a/packages/sdk/src/utils/content_topic.ts b/packages/sdk/src/utils/content_topic.ts deleted file mode 100644 index 2f10de0e79..0000000000 --- a/packages/sdk/src/utils/content_topic.ts +++ /dev/null @@ -1,125 +0,0 @@ -import type { Multiaddr } from "@multiformats/multiaddr"; -import { createDecoder, DecodedMessage, waitForRemotePeer } from "@waku/core"; -import { - Callback, - IDecoder, - ISubscriptionSDK, - LightNode, - Protocols -} from "@waku/interfaces"; -import { - contentTopicToPubsubTopic, - shardInfoToPubsubTopics -} from "@waku/utils"; - -import { createLightNode } from "../create/index.js"; - -interface CreateTopicOptions { - waku?: LightNode; - peer: Multiaddr; -} - -// Given a Waku node, peer Multiaddr, and content topic, creates a decoder and -// subscription for that content topic. -async function prepareSubscription( - waku: LightNode, - contentTopic: string, - peer: Multiaddr -): Promise<{ - decoder: IDecoder; - subscription: ISubscriptionSDK; -}> { - // Validate that the Waku node matches assumptions - if (!waku.filter) { - throw new Error("Filter protocol missing from Waku node"); - } - const { shardInfo } = waku.libp2p.components.metadata; - if (!shardInfo) { - throw new Error("Shard info missing from Waku node."); - } - - // Validate content topic and ensure node is configured for its corresponding pubsub topic - const pubsubTopics = shardInfoToPubsubTopics(shardInfo); - const pubsubTopic = contentTopicToPubsubTopic(contentTopic); - if (!pubsubTopics.includes(pubsubTopic)) - throw new Error( - "Content topic does not match any pubsub topic in shard info." - ); - - await waku.dial(peer); - await waitForRemotePeer(waku, [Protocols.Filter]); - - // Create decoder and subscription - let decoder = createDecoder(contentTopic, pubsubTopic); - if (decoder) decoder = decoder ?? decoder; - const { subscription, error } = - await waku.filter.createSubscription(pubsubTopic); - if (error) - throw new Error("Failed to create subscription for content topic."); - - return { decoder, subscription }; -} - -/** - * Creates a subscription and streams all new messages for a content topic. - * Will create a light node configured for the content topic with default settings if a node is not provided in `opts`. - * Assumes node is using autosharding. - * @param contentTopic - * @param opts - */ -export async function streamContentTopic( - contentTopic: string, - opts: CreateTopicOptions -): Promise<[ReadableStream, LightNode]> { - opts.waku = - opts.waku ?? - (await createLightNode({ - shardInfo: { contentTopics: [contentTopic] } - })); - const { decoder, subscription } = await prepareSubscription( - opts.waku, - contentTopic, - opts.peer - ); - - // Create a ReadableStream that receives any messages for the content topic - const messageStream = new ReadableStream({ - async start(controller) { - await subscription.subscribe(decoder, (message) => { - controller.enqueue(message); - }); - }, - async cancel() { - await subscription.unsubscribe([contentTopic]); - } - }); - - return [messageStream, opts.waku]; -} - -/** - * Subscribes to new messages for a content topic via callback function. - * Will create a light node configured for the content topic with default settings if a node is not provided in `opts`. - * Assumes node is using autosharding. - * @param contentTopic - * @param callback Called every time a new message is received on the content topic - * @param opts - */ -export async function subscribeToContentTopic( - contentTopic: string, - callback: Callback, - opts: CreateTopicOptions -): Promise<{ subscription: ISubscriptionSDK; waku: LightNode }> { - opts.waku = - opts.waku ?? - (await createLightNode({ - shardInfo: { contentTopics: [contentTopic] } - })); - const { decoder, subscription } = await prepareSubscription( - opts.waku, - contentTopic, - opts.peer - ); - await subscription.subscribe(decoder, callback); - return { subscription, waku: opts.waku }; -} diff --git a/packages/sdk/src/waku.ts b/packages/sdk/src/waku.ts index c36735224f..bfd124f906 100644 --- a/packages/sdk/src/waku.ts +++ b/packages/sdk/src/waku.ts @@ -1,16 +1,13 @@ import type { Stream } from "@libp2p/interface"; import { isPeerId, PeerId } from "@libp2p/interface"; import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr"; -import { ConnectionManager, DecodedMessage } from "@waku/core"; +import { ConnectionManager } from "@waku/core"; import type { - Callback, IFilterSDK, ILightPushSDK, IRelay, IStoreSDK, - ISubscriptionSDK, Libp2p, - LightNode, ProtocolCreateOptions, PubsubTopic, Waku @@ -22,7 +19,6 @@ import { Logger } from "@waku/utils"; import { wakuFilter } from "./protocols/filter.js"; import { wakuLightPush } from "./protocols/light_push.js"; import { wakuStore } from "./protocols/store.js"; -import { subscribeToContentTopic } from "./utils/content_topic.js"; export const DefaultPingKeepAliveValueSecs = 5 * 60; export const DefaultRelayKeepAliveValueSecs = 5 * 60; @@ -211,19 +207,6 @@ export class WakuNode implements Waku { await this.libp2p.stop(); } - public async subscribeToContentTopic( - contentTopic: string, - peer: Multiaddr, - callback: Callback - ): Promise { - return ( - await subscribeToContentTopic(contentTopic, callback, { - waku: this as LightNode, - peer - }) - ).subscription; - } - public isStarted(): boolean { return this.libp2p.status == "started"; } diff --git a/packages/tests/tests/sdk/content_topic.spec.ts b/packages/tests/tests/sdk/content_topic.spec.ts deleted file mode 100644 index a65804fcad..0000000000 --- a/packages/tests/tests/sdk/content_topic.spec.ts +++ /dev/null @@ -1,181 +0,0 @@ -import { - bytesToUtf8, - createEncoder, - createLightNode, - defaultLibp2p, - LightNode, - Protocols, - streamContentTopic, - subscribeToContentTopic, - utf8ToBytes, - waitForRemotePeer, - WakuNode -} from "@waku/sdk"; -import { - contentTopicToPubsubTopic, - ensureShardingConfigured, - pubsubTopicToSingleShardInfo -} from "@waku/utils"; -import { expect } from "chai"; - -import { makeLogFileName, ServiceNode, tearDownNodes } from "../../src"; - -// skipped: https://github.com/waku-org/js-waku/issues/1914 -describe.skip("SDK: Creating by Content Topic", function () { - const ContentTopic = "/myapp/1/latest/proto"; - const testMessage = "Test123"; - const clusterId = 2; - let nwaku: ServiceNode; - let waku: LightNode; - let waku2: LightNode; - - beforeEach(async function () { - this.timeout(15000); - nwaku = new ServiceNode(makeLogFileName(this) + "1"); - await nwaku.start({ - pubsubTopic: [contentTopicToPubsubTopic(ContentTopic, clusterId)], - lightpush: true, - relay: true, - filter: true, - discv5Discovery: true, - peerExchange: true, - clusterId: clusterId - }); - }); - - afterEach(async function () { - this.timeout(15000); - await tearDownNodes(nwaku, [waku, waku2]); - }); - - it("given a content topic, creates a waku node and filter subscription", async function () { - const expectedPubsubTopic = contentTopicToPubsubTopic( - ContentTopic, - clusterId - ); - - waku = ( - await subscribeToContentTopic(ContentTopic, () => {}, { - peer: await nwaku.getMultiaddrWithId() - }) - ).waku; - - expect((waku as WakuNode).pubsubTopics).to.include(expectedPubsubTopic); - }); - - it("given a waku node and content topic, creates a filter subscription", async function () { - const expectedPubsubTopic = contentTopicToPubsubTopic( - ContentTopic, - clusterId - ); - - waku = await createLightNode({ - shardInfo: { contentTopics: [ContentTopic] } - }); - await subscribeToContentTopic(ContentTopic, () => {}, { - waku, - peer: await nwaku.getMultiaddrWithId() - }); - - expect((waku as WakuNode).pubsubTopics).to.include(expectedPubsubTopic); - }); - - it("receives messages sent to provided content topic through callback", async function () { - const messages: string[] = []; - waku = ( - await subscribeToContentTopic( - ContentTopic, - (msg) => { - messages.push(bytesToUtf8(msg.payload)); - }, - { - peer: await nwaku.getMultiaddrWithId() - } - ) - ).waku; - - waku2 = await createLightNode({ - shardInfo: { contentTopics: [ContentTopic] } - }); - await waku2.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku2, [Protocols.LightPush]); - const encoder = createEncoder({ - pubsubTopicShardInfo: pubsubTopicToSingleShardInfo( - contentTopicToPubsubTopic(ContentTopic, clusterId) - ), - contentTopic: ContentTopic - }); - await waku2.lightPush?.send(encoder, { - payload: utf8ToBytes(testMessage) - }); - - expect(messages[0]).to.be.eq(testMessage); - }); - - it("receives messages sent to provided content topic through callback (Waku class)", async function () { - const messages: string[] = []; - const shardInfo = ensureShardingConfigured({ - contentTopics: [ContentTopic] - }); - const wakuContentTopic = new WakuNode( - { - pubsubTopics: shardInfo.pubsubTopics - }, - await defaultLibp2p(shardInfo.shardInfo, undefined, {}, undefined), - { - filter: true - } - ); - await wakuContentTopic.subscribeToContentTopic( - ContentTopic, - await nwaku.getMultiaddrWithId(), - (msg) => { - messages.push(bytesToUtf8(msg.payload)); - } - ); - - waku2 = await createLightNode({ - shardInfo: { contentTopics: [ContentTopic] } - }); - await waku2.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku2, [Protocols.LightPush]); - const encoder = createEncoder({ - pubsubTopicShardInfo: pubsubTopicToSingleShardInfo( - contentTopicToPubsubTopic(ContentTopic, clusterId) - ), - contentTopic: ContentTopic - }); - await waku2.lightPush?.send(encoder, { - payload: utf8ToBytes(testMessage) - }); - - expect(messages[0]).to.be.eq(testMessage); - }); - - it("receives messages sent to provided content topic through stream", async function () { - let stream; - [stream, waku] = await streamContentTopic(ContentTopic, { - peer: await nwaku.getMultiaddrWithId() - }); - - waku2 = await createLightNode({ - shardInfo: { contentTopics: [ContentTopic] } - }); - await waku2.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku2, [Protocols.LightPush]); - - const encoder = createEncoder({ - pubsubTopicShardInfo: pubsubTopicToSingleShardInfo( - contentTopicToPubsubTopic(ContentTopic, clusterId) - ), - contentTopic: ContentTopic - }); - await waku2.lightPush?.send(encoder, { - payload: utf8ToBytes(testMessage) - }); - - const reader = stream.getReader(); - const { value: message } = await reader.read(); - expect(bytesToUtf8(message!.payload)).to.be.eq(testMessage); - }); -});