diff --git a/package-lock.json b/package-lock.json index 97dc02c..a5d4ef0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,16 +1,16 @@ { "name": "@tbd54566975/dwn-sql-store", - "version": "0.2.6", + "version": "0.2.7", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@tbd54566975/dwn-sql-store", - "version": "0.2.6", + "version": "0.2.7", "license": "Apache-2.0", "dependencies": { "@ipld/dag-cbor": "^9.0.5", - "@tbd54566975/dwn-sdk-js": "0.2.10", + "@tbd54566975/dwn-sdk-js": "0.2.13", "kysely": "0.26.3", "multiformats": "12.0.1", "readable-stream": "4.4.2" @@ -895,9 +895,9 @@ "dev": true }, "node_modules/@tbd54566975/dwn-sdk-js": { - "version": "0.2.10", - "resolved": "https://registry.npmjs.org/@tbd54566975/dwn-sdk-js/-/dwn-sdk-js-0.2.10.tgz", - "integrity": "sha512-CoKO8+NciwWNzD4xRoAAgeElqQCXKM4Fc+zEHsUWD0M3E9v67hRWiTHI6AenUfQv1RSEB2H4GHUeUOHuEV72uw==", + "version": "0.2.13", + "resolved": "https://registry.npmjs.org/@tbd54566975/dwn-sdk-js/-/dwn-sdk-js-0.2.13.tgz", + "integrity": "sha512-r1B3HVL+BO7A01GQoc7hlRZA7PYykLnKcUjSZCrvR+K8SoFNoo/y+DHcCHFx9u5fvFgjUx5hXl/LW2c7r+SPXw==", "dependencies": { "@ipld/dag-cbor": "9.0.3", "@js-temporal/polyfill": "0.4.4", diff --git a/package.json b/package.json index 85c6316..4591be1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@tbd54566975/dwn-sql-store", - "version": "0.2.6", + "version": "0.2.7", "description": "SQL backed implementations of DWN MessageStore, DataStore, and EventLog", "type": "module", "license": "Apache-2.0", @@ -21,7 +21,7 @@ "react-native": "./dist/esm/src/main.js", "dependencies": { "@ipld/dag-cbor": "^9.0.5", - "@tbd54566975/dwn-sdk-js": "0.2.10", + "@tbd54566975/dwn-sdk-js": "0.2.13", "kysely": "0.26.3", "multiformats": "12.0.1", "readable-stream": "4.4.2" diff --git a/src/event-log-sql.ts b/src/event-log-sql.ts index 596d195..f2e4f87 100644 --- a/src/event-log-sql.ts +++ b/src/event-log-sql.ts @@ -1,8 +1,9 @@ import type { Database } from './database.js'; -import type { EventLog, GetEventsOptions, Filter } from '@tbd54566975/dwn-sdk-js'; -import { Kysely } from 'kysely'; +import type { EventLog, Filter, PaginationCursor } from '@tbd54566975/dwn-sdk-js'; + import { Dialect } from './dialect/dialect.js'; import { filterSelectQuery } from './utils/filter.js'; +import { Kysely } from 'kysely'; import { sanitizeFilters, sanitizeIndexes } from './utils/sanitize.js'; export class EventLogSql implements EventLog { @@ -90,18 +91,18 @@ export class EventLogSql implements EventLog { async getEvents( tenant: string, - options?: GetEventsOptions - ): Promise { + cursor?: PaginationCursor + ): Promise<{events: string[], cursor?: PaginationCursor }> { // get events is simply a query without any filters. gets all events beyond the cursor. - return this.queryEvents(tenant, [], options?.cursor); + return this.queryEvents(tenant, [], cursor); } async queryEvents( tenant: string, filters: Filter[], - cursor?: string - ): Promise { + cursor?: PaginationCursor + ): Promise<{events: string[], cursor?: PaginationCursor }> { if (!this.#db) { throw new Error( 'Connection to database not open. Call `open` before using `queryEvents`.' @@ -110,6 +111,7 @@ export class EventLogSql implements EventLog { let query = this.#db .selectFrom('eventLog') + .select('watermark') .select('messageCid') .where('tenant', '=', tenant); @@ -120,35 +122,36 @@ export class EventLogSql implements EventLog { } if(cursor !== undefined) { - const messageCid = cursor; - query = query.where(({ eb, selectFrom }) => { - - // fetch the watermark of the messageCid cursor - const cursor = selectFrom('eventLog') - .select('watermark') - .where('tenant', '=', tenant) - .where('messageCid', '=', messageCid) - .limit(1); - - return eb('watermark', '>' , cursor); + // eventLog in the sql store uses the watermark cursor value which is a number in SQL + // if not we will return empty results + const cursorValue = cursor.value as number; + const cursorMessageCid = cursor.messageCid; + + query = query.where(({ eb, refTuple, tuple }) => { + // https://kysely-org.github.io/kysely-apidoc/interfaces/ExpressionBuilder.html#refTuple + return eb(refTuple('watermark', 'messageCid'), '>', tuple(cursorValue, cursorMessageCid)); }); } - query = query.orderBy('watermark', 'asc'); + query = query.orderBy('watermark', 'asc').orderBy('messageCid', 'asc'); const events: string[] = []; + // we always return a cursor with the event log query, so we set the return cursor to the properties of the last item. + let returnCursor: PaginationCursor | undefined; if (this.#dialect.isStreamingSupported) { - for await (let { messageCid } of query.stream()) { + for await (let { messageCid, watermark: value } of query.stream()) { events.push(messageCid); + returnCursor = { messageCid, value }; } } else { const results = await query.execute(); - for (let { messageCid } of results) { + for (let { messageCid, watermark: value } of results) { events.push(messageCid); + returnCursor = { messageCid, value }; } } - return events; + return { events, cursor: returnCursor }; } async deleteEventsByCid( diff --git a/src/message-store-sql.ts b/src/message-store-sql.ts index 336ba78..a9b5a75 100644 --- a/src/message-store-sql.ts +++ b/src/message-store-sql.ts @@ -4,12 +4,12 @@ import { executeUnlessAborted, Filter, GenericMessage, - Message, MessageStore, MessageStoreOptions, MessageSort, Pagination, - SortDirection + SortDirection, + PaginationCursor } from '@tbd54566975/dwn-sdk-js'; import { Kysely } from 'kysely'; import { Database } from './database.js'; @@ -176,7 +176,7 @@ export class MessageStoreSql implements MessageStore { messageSort?: MessageSort, pagination?: Pagination, options?: MessageStoreOptions - ): Promise<{ messages: GenericMessage[], cursor?: string }> { + ): Promise<{ messages: GenericMessage[], cursor?: PaginationCursor}> { if (!this.#db) { throw new Error( 'Connection to database not open. Call `open` before using `query`.' @@ -197,22 +197,18 @@ export class MessageStoreSql implements MessageStore { query = filterSelectQuery(filters, query); // extract sort property and direction from the supplied messageSort - const { property: sortProperty, direction: sortDirection } = this.getOrderBy(messageSort); + const { property: sortProperty, direction: sortDirection } = this.extractSortProperties(messageSort); if(pagination?.cursor !== undefined) { - const messageCid = pagination.cursor; - query = query.where(({ eb, selectFrom, refTuple }) => { - const direction = sortDirection === SortDirection.Ascending ? '>' : '<'; - - // fetches the cursor as a sort property tuple from the database based on the messageCid. - const cursor = selectFrom('messageStore') - .select([sortProperty, 'messageCid']) - .where('tenant', '=', tenant) - .where('messageCid', '=', messageCid) - .limit(1).$asTuple(sortProperty, 'messageCid'); + // currently the sort property is explicitly either `dateCreated` | `messageTimestamp` | `datePublished` which are all strings + // TODO: https://github.com/TBD54566975/dwn-sdk-js/issues/664 to handle the edge case + const cursorValue = pagination.cursor.value as string; + const cursorMessageId = pagination.cursor.messageCid; + query = query.where(({ eb, refTuple, tuple }) => { + const direction = sortDirection === SortDirection.Ascending ? '>' : '<'; // https://kysely-org.github.io/kysely-apidoc/interfaces/ExpressionBuilder.html#refTuple - return eb(refTuple(sortProperty, 'messageCid'), direction, cursor); + return eb(refTuple(sortProperty, 'messageCid'), direction, tuple(cursorValue, cursorMessageId)); }); } @@ -232,11 +228,9 @@ export class MessageStoreSql implements MessageStore { options?.signal ); - // extracts the full encoded message from the stored blob for each result item. - const messages: Promise[] = results.map((r:any) => this.parseEncodedMessage(r.encodedMessageBytes, r.encodedData, options)); - - // returns the pruned the messages, since we have and additional record from above, and a potential messageCid cursor - return this.getPaginationResults(messages, pagination?.limit); + // prunes the additional requested message, if it exists, and adds a cursor to the results. + // also parses the encoded message for each of the returned results. + return this.processPaginationResults(results, sortProperty, pagination?.limit, options); } async delete( @@ -298,30 +292,41 @@ export class MessageStoreSql implements MessageStore { } /** - * Gets the pagination Message Cid if there are additional messages to paginate. + * Processes the paginated query results. + * Builds a pagination cursor if there are additional messages to paginate. * Accepts more messages than the limit, as we query for additional records to check if we should paginate. * * @param messages a list of messages, potentially larger than the provided limit. * @param limit the maximum number of messages to be returned * - * @returns the pruned message results and an optional messageCid cursor + * @returns the pruned message results and an optional pagination cursor */ - private async getPaginationResults( - messages: Promise[], limit?: number - ): Promise<{ messages: GenericMessage[], cursor?: string }>{ - if (limit !== undefined && messages.length > limit) { - messages = messages.slice(0, limit); - const lastMessage = messages.at(-1); - return { - messages : await Promise.all(messages), - cursor : lastMessage ? await Message.getCid(await lastMessage) : undefined - }; + private async processPaginationResults( + results: any[], + sortProperty: string, + limit?: number, + options?: MessageStoreOptions, + ): Promise<{ messages: GenericMessage[], cursor?: PaginationCursor}> { + // we queried for one additional message to determine if there are any additional messages beyond the limit + // we now check if the returned results are greater than the limit, if so we pluck the last item out of the result set + // the cursor is always the last item in the *returned* result so we use the last item in the remaining result set to build a cursor + let cursor: PaginationCursor | undefined; + if (limit !== undefined && results.length > limit) { + results = results.slice(0, limit); + const lastMessage = results.at(-1); + const cursorValue = lastMessage[sortProperty]; + cursor = { messageCid: lastMessage.messageCid, value: cursorValue }; } - return { messages: await Promise.all(messages) }; + // extracts the full encoded message from the stored blob for each result item. + const messages: Promise[] = results.map(r => this.parseEncodedMessage(r.encodedMessageBytes, r.encodedData, options)); + return { messages: await Promise.all(messages), cursor }; } - private getOrderBy( + /** + * Extracts the appropriate sort property and direction given a MessageSort object. + */ + private extractSortProperties( messageSort?: MessageSort ):{ property: 'dateCreated' | 'datePublished' | 'messageTimestamp', direction: SortDirection } { if(messageSort?.dateCreated !== undefined) {