Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Flag to fully disable pubsub and all cross-node syncing #3286

Merged
merged 2 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/cli/src/ceramic-daemon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ export function makeCeramicConfig(opts: DaemonConfig): CeramicConfig {
syncOverride: SYNC_OPTIONS_MAP[opts.node.syncOverride],
streamCacheLimit: opts.node.streamCacheLimit,
indexing: opts.indexing,
disablePeerDataSync: opts.ipfs.disablePeerDataSync,
disablePeerDataSync:
opts.ipfs.disablePeerDataSync || process.env.CERAMIC_DISABLE_PEER_DATA_SYNC == 'true',
metrics: opts.metrics,
}
if (opts.stateStore?.mode == StateStoreMode.FS) {
Expand Down
11 changes: 5 additions & 6 deletions packages/core/src/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ export class Dispatcher {
const rustCeramic = EnvironmentUtils.useRustCeramic()
this.enableSync = rustCeramic ? false : enableSync

if (!rustCeramic) {
if (this.enableSync) {
const pubsub = new Pubsub(
_ipfs,
topic,
Expand Down Expand Up @@ -178,10 +178,9 @@ export class Dispatcher {
}

async init() {
if (EnvironmentUtils.useRustCeramic()) {
return
if (this.enableSync) {
this.messageBus.subscribe(this.handleMessage.bind(this))
}
this.messageBus.subscribe(this.handleMessage.bind(this))
}

get shutdownSignal(): ShutdownSignal {
Expand Down Expand Up @@ -498,7 +497,7 @@ export class Dispatcher {
* @param tip - Commit CID
*/
publishTip(streamId: StreamID, tip: CID, model?: StreamID): Subscription {
if (process.env.CERAMIC_DISABLE_PUBSUB_UPDATES == 'true' || EnvironmentUtils.useRustCeramic()) {
if (!this.enableSync) {
return empty().subscribe()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the difference between CERAMIC_DISABLE_PEER_DATA_SYNC and CERAMIC_DISABLE_PUBSUB_UPDATES ?

I take it this means we could just not publish, or we could not publish and not read or sync with other nodes? but neither applies under rust ceramic?

If we were going to keep this i think it woudl be confusing but if we are going to leave js-ceramic entirely soon i guess it is fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CERAMIC_DISABLE_PUBSUB_UPDATES is much more limited in scope. It just disables publishing of update messages. We'll still listen to pubsub and still publish query and response messages.

Honestly it's not really very useful, we put it in once a while ago to try to troubleshoot an issue gitcoin was having, I don't think anyone uses it anymore and we could frankly probably get rid of it.

}

Expand Down Expand Up @@ -621,7 +620,7 @@ export class Dispatcher {
* Gracefully closes the Dispatcher.
*/
async close(): Promise<void> {
if (!EnvironmentUtils.useRustCeramic()) {
if (this.enableSync) {
this.messageBus.unsubscribe()
}
await this.tasks.onIdle()
Expand Down
16 changes: 15 additions & 1 deletion packages/core/src/initialization/stream-loading.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ import { LogSyncer } from '../stream-loading/log-syncer.js'
import { StateManipulator } from '../stream-loading/state-manipulator.js'
import { AnchorValidator } from '../anchor/anchor-service.js'
import { HandlersMap } from '../handlers-map.js'
import { StreamID } from '@ceramicnetwork/streamid'
import { Observable, empty } from 'rxjs'
import type { CID } from 'multiformats/cid'

const noopPubsubQuerier = {
queryNetwork(streamId: StreamID): Observable<CID> {
return empty()
},
}

export function makeStreamLoaderAndUpdater(
logger: DiagnosticsLogger,
Expand All @@ -17,7 +26,12 @@ export function makeStreamLoaderAndUpdater(
streamHandlers: HandlersMap
): [StreamLoader, StreamUpdater] {
const anchorTimestampExtractor = new AnchorTimestampExtractor(logger, dispatcher, anchorValidator)
const tipFetcher = new TipFetcher(dispatcher.messageBus)
if (!dispatcher.messageBus) {
logger.warn("No pubsub querier detected, won't be able to load tips from the network")
}
const tipFetcher = new TipFetcher(
dispatcher.messageBus ? dispatcher.messageBus : noopPubsubQuerier
)
const logSyncer = new LogSyncer(dispatcher)
const stateManipulator = new StateManipulator(logger, streamHandlers, logSyncer, api)
const streamLoader = new StreamLoader(
Expand Down
32 changes: 10 additions & 22 deletions packages/stream-tests/src/__tests__/ceramic_sync_disabled.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ const makeCeramicCore = async (
return core
}

// should pass on v4 when updated from TileDocument

describeIfV3('Cross node syncing disabled', () => {
jest.setTimeout(20000)

Expand Down Expand Up @@ -110,32 +108,22 @@ describeIfV3('Cross node syncing disabled', () => {
it('Stream created and updated on node with peer data sync disabled still loads via other well connected nodes', async () => {
const content0 = { step: 0 }
const content1 = { step: 1 }
const content2 = { step: 2 }
const doc1 = await TileDocument.create(disconnectedCeramic, content0, null, {
anchor: false,
})
await doc1.update(content1, null, { anchor: false })

const doc2 = await TileDocument.load(connectedCeramic, doc1.id)
expect(doc1.content).toEqual(doc2.content)

// Update should also propagate from node with sync disabled to the other node without issue
await doc1.update(content2, null, { anchor: false })

await TestUtils.waitForState(
doc2,
5000,
(state) => state.log.length == 3,
(state) => {
throw new Error(`Sync failed. State: ${StreamUtils.serializeState(state)}`)
}
)

expect(doc1.content).toEqual(content2)
expect(doc1.state.log.length).toEqual(3)

expect(doc2.content).toEqual(content2)
expect(doc2.state.log.length).toEqual(3)
// The disconnected node won't be listening to pubsub so the connected node will only get the
// genesis commit, not the tip from the update.
expect(doc2.content).toEqual(content0)

// Loading at the specific CommitID of the update will work though because the underlying
// commit blocks are still available via bitswap.
const docAtCommit = await TileDocument.load(connectedCeramic, doc1.commitId)
expect(docAtCommit.content).toEqual(content1)
expect(doc1.content).toEqual(docAtCommit.content)
expect(docAtCommit.state.log.length).toEqual(2)
})

it('Updates made on connected node not visible to node with peer data sync disabled', async () => {
Expand Down