From d13003187f7c495475cb935e13aee556347c0a9d Mon Sep 17 00:00:00 2001 From: Jordan Garcia Date: Fri, 10 May 2024 12:25:34 -1000 Subject: [PATCH 1/5] Wait for createDocument to be loaded for subsequent createConnections --- packages/server/src/Hocuspocus.ts | 20 ++- tests/server/onLoadDocument.ts | 211 ++++++++---------------------- 2 files changed, 67 insertions(+), 164 deletions(-) diff --git a/packages/server/src/Hocuspocus.ts b/packages/server/src/Hocuspocus.ts index cfe2135c..e5ee922c 100644 --- a/packages/server/src/Hocuspocus.ts +++ b/packages/server/src/Hocuspocus.ts @@ -72,6 +72,8 @@ export class Hocuspocus { onDestroy: () => new Promise(r => r(null)), } + loadingDocuments: Map> = new Map() + documents: Map = new Map() server?: HocuspocusServer @@ -414,15 +416,23 @@ export class Hocuspocus { /** * Create a new document by the given request */ - public async createDocument(documentName: string, request: Partial>, socketId: string, connection: ConnectionConfiguration, context?: any): Promise { - if (this.documents.has(documentName)) { - const document = this.documents.get(documentName) + public createDocument(documentName: string, request: Partial>, socketId: string, connection: ConnectionConfiguration, context?: any): Promise { + if (this.loadingDocuments.has(documentName)) { + const documentPromise = this.loadingDocuments.get(documentName) - if (document) { - return document + if (documentPromise) { + return documentPromise } } + const loadDocPromise = this.loadDocument(documentName, request, socketId, connection, context) + + this.loadingDocuments.set(documentName, loadDocPromise) + + return loadDocPromise + } + + async loadDocument(documentName: string, request: Partial>, socketId: string, connection: ConnectionConfiguration, context?: any): Promise { const requestHeaders = request.headers ?? {} const requestParameters = getParameters(request) diff --git a/tests/server/onLoadDocument.ts b/tests/server/onLoadDocument.ts index 3fcae8cc..d05eecf5 100644 --- a/tests/server/onLoadDocument.ts +++ b/tests/server/onLoadDocument.ts @@ -1,6 +1,8 @@ import test from 'ava' import { newHocuspocus, newHocuspocusProvider } from '../utils/index.js' +const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)) + test('executes the onLoadDocument callback', async t => { await new Promise(async resolve => { const server = await newHocuspocus({ @@ -130,6 +132,56 @@ test('multiple simultaneous connections do not create multiple documents', async }) }) +test('multiple simultaneous connections wait for the document to be loaded', async t => { + t.plan(6) + + await new Promise(async resolve => { + let resolveOnLoadDocument: () => void = () => {} + + const server = await newHocuspocus({ + onLoadDocument({ document }) { + // delay more accurately simulates a database fetch + return new Promise(async innerResolve => { + resolveOnLoadDocument = () => { + document.getArray('foo').insert(0, ['bar']) + innerResolve(document) + } + }) + }, + }) + + const provider1 = newHocuspocusProvider(server) + const provider2 = newHocuspocusProvider(server) + let provider1Synced = false + let provider2Synced = false + + provider1.on('synced', () => { + provider1Synced = true + const value = provider1.document.getArray('foo').get(0) + t.is(value, 'bar') + }) + provider2.on('synced', () => { + provider2Synced = true + const value = provider2.document.getArray('foo').get(0) + t.is(value, 'bar') + }) + + await sleep(100) + + t.false(provider1Synced, 'provider1Synced') + t.false(provider2Synced, 'provider2Synced') + + resolveOnLoadDocument() + + await sleep(100) + + t.true(provider1Synced, 'provider1Synced') + t.true(provider2Synced, 'provider2Synced') + + resolve('done') + }) +}) + test('has the server instance', async t => { await new Promise(async resolve => { const server = await newHocuspocus({ @@ -164,60 +216,6 @@ test('stops when an error is thrown in onLoadDocument', async t => { }) }) -test('disconnects all clients related to the document when an error is thrown in onLoadDocument', async t => { - const resolvesNeeded = 4 - - await new Promise(async resolve => { - - const server = await newHocuspocus({ - async onLoadDocument() { - return new Promise((resolve, fail) => { - setTimeout(() => { - // eslint-disable-next-line prefer-promise-reject-errors - fail('ERROR') - }, 250) - }) - }, - async onStoreDocument(data) { - t.fail('MUST NOT call onStoreDocument') - }, - }) - - let resolvedNumber = 0 - const resolver = () => { - resolvedNumber += 1 - - if (resolvedNumber >= resolvesNeeded) { - t.is(server.documents.size, 0) - t.is(server.getConnectionsCount(), 0) - resolve('done') - } - } - - const provider1 = newHocuspocusProvider(server, { - onConnect() { - resolver() - }, - onClose(event) { - provider1.disconnect() - resolver() - }, - }) - - const provider2 = newHocuspocusProvider(server, { - onConnect() { - resolver() - }, - onClose() { - provider2.disconnect() - resolver() - }, - }) - - }) - -}) - test('stops when an error is thrown in onLoadDocument, even when authenticated', async t => { await new Promise(async resolve => { const server = await newHocuspocus({ @@ -241,108 +239,3 @@ test('stops when an error is thrown in onLoadDocument, even when authenticated', }) }) }) - -test('if a new connection connects while the previous connection still fetches the document, it will just work properly', async t => { - let callsToOnLoadDocument = 0 - const resolvesNeeded = 11 - - await new Promise(async resolve => { - - let resolvedNumber = 0 - const resolver = () => { - resolvedNumber += 1 - - if (resolvedNumber >= resolvesNeeded) { - t.is(callsToOnLoadDocument, 1) - resolve('done') - } - } - - const server = await newHocuspocus({ - onLoadDocument({ document }) { - return new Promise(async resolve => { - setTimeout(() => { - callsToOnLoadDocument += 1 - document.getArray('foo').insert(0, [`bar-${callsToOnLoadDocument}`]) - resolve(document) - }, 5000) - }) - }, - }) - - let provider1MessagesReceived = 0 - const provider = newHocuspocusProvider(server, { - onSynced({ state }) { - // if (!state) return - t.is(server.documents.size, 1) - - const value = provider.document.getArray('foo').get(0) - t.is(value, 'bar-1') - - setTimeout(() => { - provider.document.getArray('foo').insert(0, ['bar-updatedAfterProvider1Synced']) - }, 100) - - resolver() - }, - onMessage() { - if (!provider.isSynced) return - provider1MessagesReceived += 1 - - const value = provider.document.getArray('foo').get(0) - - if (provider1MessagesReceived === 1) { - // do nothing, this is just the ACK for the sync - } else if (provider1MessagesReceived === 2) { - // do nothing, this is just the ACK for the received update (set "bar-updatedAfterProvider1Synced") - } else if (provider1MessagesReceived === 3) { - t.is(value, 'bar-updatedAfterProvider1Synced') - } else { - t.is(value, 'bar-updatedAfterProvider2ReceivedMessageFrom1') - } - - resolver() - }, - }) - - let provider2MessagesReceived = 0 - setTimeout(() => { - const provider2 = newHocuspocusProvider(server, { - onSynced({ state }) { - // if (!state) return - - t.is(server.documents.size, 1) - - const value = provider.document.getArray('foo').get(0) - t.is(value, undefined) // document hasnt loaded yet because it loads for 5sec, but this runs after ~2sec - - resolver() - }, - onMessage(data) { - if (!provider2.isSynced) return - provider2MessagesReceived += 1 - - const value = provider.document.getArray('foo').get(0) - - if (provider2MessagesReceived === 1) { - // do nothing, this is just the ACK for the sync - t.is(value, undefined) - } else if (provider2MessagesReceived === 2) { - // initial state is now synced - t.is(value, undefined) - } else if (provider2MessagesReceived === 3) { - t.is(value, 'bar-updatedAfterProvider1Synced') - setTimeout(() => { - provider.document.getArray('foo').insert(0, ['bar-updatedAfterProvider2ReceivedMessageFrom1']) - }, 100) - } else { - t.is(value, 'bar-updatedAfterProvider2ReceivedMessageFrom1') - } - - resolver() - }, - }) - - }, 2000) - }) -}) From eccc6e3681ba3c5a1b99a06045210655cb2ad8f0 Mon Sep 17 00:00:00 2001 From: Jordan Garcia Date: Mon, 13 May 2024 19:42:06 -1000 Subject: [PATCH 2/5] Delete loadingDocument on unloadDocument --- packages/server/src/Hocuspocus.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/server/src/Hocuspocus.ts b/packages/server/src/Hocuspocus.ts index e5ee922c..0bcde9ed 100644 --- a/packages/server/src/Hocuspocus.ts +++ b/packages/server/src/Hocuspocus.ts @@ -572,6 +572,7 @@ export class Hocuspocus { if (!this.documents.has(documentName)) return this.documents.delete(documentName) + this.loadingDocuments.delete(documentName) document.destroy() this.hooks('afterUnloadDocument', { instance: this, documentName }) } From f015426419949263517193fba95b5c2f56f8cb27 Mon Sep 17 00:00:00 2001 From: Jordan Garcia Date: Tue, 14 May 2024 10:21:36 -1000 Subject: [PATCH 3/5] simplify `loadingDoc.has` call --- packages/server/src/Hocuspocus.ts | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/packages/server/src/Hocuspocus.ts b/packages/server/src/Hocuspocus.ts index 0bcde9ed..bce4e95f 100644 --- a/packages/server/src/Hocuspocus.ts +++ b/packages/server/src/Hocuspocus.ts @@ -417,12 +417,9 @@ export class Hocuspocus { * Create a new document by the given request */ public createDocument(documentName: string, request: Partial>, socketId: string, connection: ConnectionConfiguration, context?: any): Promise { - if (this.loadingDocuments.has(documentName)) { - const documentPromise = this.loadingDocuments.get(documentName) - - if (documentPromise) { - return documentPromise - } + const existingLoadingDoc = this.loadingDocuments.get(documentName) + if (existingLoadingDoc) { + return existingLoadingDoc } const loadDocPromise = this.loadDocument(documentName, request, socketId, connection, context) From c5de893e8ef4b928accfd9b28e48cd00442eabc8 Mon Sep 17 00:00:00 2001 From: Jan Thurau Date: Fri, 17 May 2024 09:33:21 +0200 Subject: [PATCH 4/5] re-adds some adjusted tests, loadingDocuments promise cleanup --- packages/server/src/Hocuspocus.ts | 11 ++- tests/server/onLoadDocument.ts | 150 ++++++++++++++++++++++++++++++ 2 files changed, 160 insertions(+), 1 deletion(-) diff --git a/packages/server/src/Hocuspocus.ts b/packages/server/src/Hocuspocus.ts index bce4e95f..dddbefa5 100644 --- a/packages/server/src/Hocuspocus.ts +++ b/packages/server/src/Hocuspocus.ts @@ -418,14 +418,24 @@ export class Hocuspocus { */ public createDocument(documentName: string, request: Partial>, socketId: string, connection: ConnectionConfiguration, context?: any): Promise { const existingLoadingDoc = this.loadingDocuments.get(documentName) + if (existingLoadingDoc) { return existingLoadingDoc } + const existingDoc = this.documents.get(documentName) + if (existingDoc) { + return Promise.resolve(existingDoc) + } + const loadDocPromise = this.loadDocument(documentName, request, socketId, connection, context) this.loadingDocuments.set(documentName, loadDocPromise) + loadDocPromise.finally(() => { + this.loadingDocuments.delete(documentName) + }) + return loadDocPromise } @@ -569,7 +579,6 @@ export class Hocuspocus { if (!this.documents.has(documentName)) return this.documents.delete(documentName) - this.loadingDocuments.delete(documentName) document.destroy() this.hooks('afterUnloadDocument', { instance: this, documentName }) } diff --git a/tests/server/onLoadDocument.ts b/tests/server/onLoadDocument.ts index d05eecf5..f072d5bd 100644 --- a/tests/server/onLoadDocument.ts +++ b/tests/server/onLoadDocument.ts @@ -239,3 +239,153 @@ test('stops when an error is thrown in onLoadDocument, even when authenticated', }) }) }) + +test('disconnects all clients related to the document when an error is thrown in onLoadDocument', async t => { + const resolvesNeeded = 2 + + await new Promise(async resolve => { + + const server = await newHocuspocus({ + async onLoadDocument() { + return new Promise((resolve, fail) => { + setTimeout(() => { + // eslint-disable-next-line prefer-promise-reject-errors + fail('ERROR') + }, 250) + }) + }, + async onStoreDocument(data) { + t.fail('MUST NOT call onStoreDocument') + }, + }) + + let resolvedNumber = 0 + const resolver = () => { + resolvedNumber += 1 + + if (resolvedNumber >= resolvesNeeded) { + t.is(server.documents.size, 0) + t.is(server.getConnectionsCount(), 0) + resolve('done') + } + } + + const provider1 = newHocuspocusProvider(server, { + onClose(event) { + provider1.disconnect() + resolver() + }, + }) + + const provider2 = newHocuspocusProvider(server, { + onClose() { + provider2.disconnect() + resolver() + }, + }) + + }) + +}) + +test('if a new connection connects while the previous connection still fetches the document, it will just work properly', async t => { + let callsToOnLoadDocument = 0 + const resolvesNeeded = 10 + + await new Promise(async resolve => { + + let resolvedNumber = 0 + const resolver = () => { + resolvedNumber += 1 + + if (resolvedNumber >= resolvesNeeded) { + t.is(callsToOnLoadDocument, 1) + resolve('done') + } + } + + const server = await newHocuspocus({ + onLoadDocument({ document }) { + return new Promise(async resolve => { + setTimeout(() => { + callsToOnLoadDocument += 1 + document.getArray('foo').insert(0, [`bar-${callsToOnLoadDocument}`]) + resolve(document) + }, 5000) + }) + }, + }) + + let provider1MessagesReceived = 0 + const provider = newHocuspocusProvider(server, { + onSynced({ state }) { + // if (!state) return + t.is(server.documents.size, 1) + + const value = provider.document.getArray('foo').get(0) + t.is(value, 'bar-1') + + setTimeout(() => { + provider.document.getArray('foo').insert(0, ['bar-updatedAfterProvider1Synced']) + }, 100) + + resolver() + }, + onMessage() { + if (!provider.isSynced) return + provider1MessagesReceived += 1 + + const value = provider.document.getArray('foo').get(0) + + if (provider1MessagesReceived === 1) { + // do nothing, this is just the ACK for the sync + } else if (provider1MessagesReceived === 2) { + // do nothing, this is just the ACK for the received update (set "bar-updatedAfterProvider1Synced") + } else if (provider1MessagesReceived === 3) { + t.is(value, 'bar-updatedAfterProvider1Synced') + } else { + t.is(value, 'bar-updatedAfterProvider2ReceivedMessageFrom1') + } + + resolver() + }, + }) + + let provider2MessagesReceived = 0 + setTimeout(() => { + const provider2 = newHocuspocusProvider(server, { + onSynced({ state }) { + // if (!state) return + + t.is(server.documents.size, 1) + + const value = provider.document.getArray('foo').get(0) + t.is(value, 'bar-1') + + resolver() + }, + onMessage(data) { + if (!provider2.isSynced) return + provider2MessagesReceived += 1 + + const value = provider.document.getArray('foo').get(0) + + if (provider2MessagesReceived === 1) { + // initial state is now synced + t.is(value, 'bar-1') + } else if (provider2MessagesReceived === 2) { + t.is(value, 'bar-updatedAfterProvider1Synced') + setTimeout(() => { + provider.document.getArray('foo').insert(0, ['bar-updatedAfterProvider2ReceivedMessageFrom1']) + }, 100) + } else { + t.is(value, 'bar-updatedAfterProvider2ReceivedMessageFrom1') + } + + resolver() + }, + }) + + }, 2000) + }) +}) From 672784d58e80d887f3062689721617ad3ce36317 Mon Sep 17 00:00:00 2001 From: Jan Thurau Date: Fri, 17 May 2024 10:07:58 +0200 Subject: [PATCH 5/5] re-adds some adjusted tests, loadingDocuments promise cleanup --- packages/server/src/Hocuspocus.ts | 10 +++++++--- tests/server/onLoadDocument.ts | 30 +++++++++++++++++------------- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/packages/server/src/Hocuspocus.ts b/packages/server/src/Hocuspocus.ts index dddbefa5..a9d3f78a 100644 --- a/packages/server/src/Hocuspocus.ts +++ b/packages/server/src/Hocuspocus.ts @@ -416,7 +416,7 @@ export class Hocuspocus { /** * Create a new document by the given request */ - public createDocument(documentName: string, request: Partial>, socketId: string, connection: ConnectionConfiguration, context?: any): Promise { + public async createDocument(documentName: string, request: Partial>, socketId: string, connection: ConnectionConfiguration, context?: any): Promise { const existingLoadingDoc = this.loadingDocuments.get(documentName) if (existingLoadingDoc) { @@ -432,9 +432,13 @@ export class Hocuspocus { this.loadingDocuments.set(documentName, loadDocPromise) - loadDocPromise.finally(() => { + try { + await loadDocPromise this.loadingDocuments.delete(documentName) - }) + } catch (e) { + this.loadingDocuments.delete(documentName) + throw e + } return loadDocPromise } diff --git a/tests/server/onLoadDocument.ts b/tests/server/onLoadDocument.ts index f072d5bd..3c725b3b 100644 --- a/tests/server/onLoadDocument.ts +++ b/tests/server/onLoadDocument.ts @@ -289,6 +289,8 @@ test('disconnects all clients related to the document when an error is thrown in }) test('if a new connection connects while the previous connection still fetches the document, it will just work properly', async t => { + t.plan(11) + let callsToOnLoadDocument = 0 const resolvesNeeded = 10 @@ -359,7 +361,7 @@ test('if a new connection connects while the previous connection still fetches t t.is(server.documents.size, 1) - const value = provider.document.getArray('foo').get(0) + const value = provider2.document.getArray('foo').get(0) t.is(value, 'bar-1') resolver() @@ -368,21 +370,23 @@ test('if a new connection connects while the previous connection still fetches t if (!provider2.isSynced) return provider2MessagesReceived += 1 - const value = provider.document.getArray('foo').get(0) + setTimeout(() => { + const value = provider2.document.getArray('foo').get(0) - if (provider2MessagesReceived === 1) { + if (provider2MessagesReceived === 1) { // initial state is now synced - t.is(value, 'bar-1') - } else if (provider2MessagesReceived === 2) { - t.is(value, 'bar-updatedAfterProvider1Synced') - setTimeout(() => { - provider.document.getArray('foo').insert(0, ['bar-updatedAfterProvider2ReceivedMessageFrom1']) - }, 100) - } else { - t.is(value, 'bar-updatedAfterProvider2ReceivedMessageFrom1') - } + t.is(value, 'bar-1') + } else if (provider2MessagesReceived === 2) { + t.is(value, 'bar-updatedAfterProvider1Synced') + setTimeout(() => { + provider.document.getArray('foo').insert(0, ['bar-updatedAfterProvider2ReceivedMessageFrom1']) + }, 100) + } else { + t.is(value, 'bar-updatedAfterProvider2ReceivedMessageFrom1') + } + resolver() + }) - resolver() }, })