Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PaginationCursor update from dwn-sdk-js #15

Merged
merged 11 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@tbd54566975/dwn-sql-store",
"version": "0.2.6",
"version": "0.2.7",
"description": "SQL backed implementations of DWN MessageStore, DataStore, and EventLog",
"type": "module",
"license": "Apache-2.0",
Expand All @@ -21,7 +21,7 @@
"react-native": "./dist/esm/src/main.js",
"dependencies": {
"@ipld/dag-cbor": "^9.0.5",
"@tbd54566975/dwn-sdk-js": "0.2.10",
"@tbd54566975/dwn-sdk-js": "0.2.13",
"kysely": "0.26.3",
"multiformats": "12.0.1",
"readable-stream": "4.4.2"
Expand Down
47 changes: 25 additions & 22 deletions src/event-log-sql.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import type { Database } from './database.js';
import type { EventLog, GetEventsOptions, Filter } from '@tbd54566975/dwn-sdk-js';
import { Kysely } from 'kysely';
import type { EventLog, Filter, PaginationCursor } from '@tbd54566975/dwn-sdk-js';

import { Dialect } from './dialect/dialect.js';
import { filterSelectQuery } from './utils/filter.js';
import { Kysely } from 'kysely';
import { sanitizeFilters, sanitizeIndexes } from './utils/sanitize.js';

export class EventLogSql implements EventLog {
Expand Down Expand Up @@ -90,18 +91,18 @@ export class EventLogSql implements EventLog {

async getEvents(
tenant: string,
options?: GetEventsOptions
): Promise<string[]> {
cursor?: PaginationCursor
): Promise<{events: string[], cursor?: PaginationCursor }> {

// get events is simply a query without any filters. gets all events beyond the cursor.
return this.queryEvents(tenant, [], options?.cursor);
return this.queryEvents(tenant, [], cursor);
}

async queryEvents(
tenant: string,
filters: Filter[],
cursor?: string
): Promise<string[]> {
cursor?: PaginationCursor
): Promise<{events: string[], cursor?: PaginationCursor }> {
if (!this.#db) {
throw new Error(
'Connection to database not open. Call `open` before using `queryEvents`.'
Expand All @@ -110,6 +111,7 @@ export class EventLogSql implements EventLog {

let query = this.#db
.selectFrom('eventLog')
.select('watermark')
.select('messageCid')
.where('tenant', '=', tenant);

Expand All @@ -120,35 +122,36 @@ export class EventLogSql implements EventLog {
}

if(cursor !== undefined) {
const messageCid = cursor;
query = query.where(({ eb, selectFrom }) => {

// fetch the watermark of the messageCid cursor
const cursor = selectFrom('eventLog')
.select('watermark')
.where('tenant', '=', tenant)
.where('messageCid', '=', messageCid)
.limit(1);

return eb('watermark', '>' , cursor);
// eventLog in the sql store uses the watermark cursor value which is a number in SQL
// if not we will return empty results
const cursorValue = cursor.value as number;
const cursorMessageCid = cursor.messageCid;

query = query.where(({ eb, refTuple, tuple }) => {
// https://kysely-org.github.io/kysely-apidoc/interfaces/ExpressionBuilder.html#refTuple
return eb(refTuple('watermark', 'messageCid'), '>', tuple(cursorValue, cursorMessageCid));
});
}

query = query.orderBy('watermark', 'asc');
query = query.orderBy('watermark', 'asc').orderBy('messageCid', 'asc');

const events: string[] = [];
// we always return a cursor with the event log query, so we set the return cursor to the properties of the last item.
let returnCursor: PaginationCursor | undefined;
if (this.#dialect.isStreamingSupported) {
for await (let { messageCid } of query.stream()) {
for await (let { messageCid, watermark: value } of query.stream()) {
events.push(messageCid);
returnCursor = { messageCid, value };
}
} else {
const results = await query.execute();
for (let { messageCid } of results) {
for (let { messageCid, watermark: value } of results) {
events.push(messageCid);
returnCursor = { messageCid, value };
}
}

return events;
return { events, cursor: returnCursor };
}

async deleteEventsByCid(
Expand Down
73 changes: 39 additions & 34 deletions src/message-store-sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import {
executeUnlessAborted,
Filter,
GenericMessage,
Message,
MessageStore,
MessageStoreOptions,
MessageSort,
Pagination,
SortDirection
SortDirection,
PaginationCursor
} from '@tbd54566975/dwn-sdk-js';
import { Kysely } from 'kysely';
import { Database } from './database.js';
Expand Down Expand Up @@ -176,7 +176,7 @@ export class MessageStoreSql implements MessageStore {
messageSort?: MessageSort,
pagination?: Pagination,
options?: MessageStoreOptions
): Promise<{ messages: GenericMessage[], cursor?: string }> {
): Promise<{ messages: GenericMessage[], cursor?: PaginationCursor}> {
if (!this.#db) {
throw new Error(
'Connection to database not open. Call `open` before using `query`.'
Expand All @@ -197,22 +197,18 @@ export class MessageStoreSql implements MessageStore {
query = filterSelectQuery(filters, query);

// extract sort property and direction from the supplied messageSort
const { property: sortProperty, direction: sortDirection } = this.getOrderBy(messageSort);
const { property: sortProperty, direction: sortDirection } = this.extractSortProperties(messageSort);

if(pagination?.cursor !== undefined) {
const messageCid = pagination.cursor;
query = query.where(({ eb, selectFrom, refTuple }) => {
const direction = sortDirection === SortDirection.Ascending ? '>' : '<';

// fetches the cursor as a sort property tuple from the database based on the messageCid.
const cursor = selectFrom('messageStore')
.select([sortProperty, 'messageCid'])
.where('tenant', '=', tenant)
.where('messageCid', '=', messageCid)
.limit(1).$asTuple(sortProperty, 'messageCid');
// currently the sort property is explicitly either `dateCreated` | `messageTimestamp` | `datePublished` which are all strings
// TODO: https://github.com/TBD54566975/dwn-sdk-js/issues/664 to handle the edge case
const cursorValue = pagination.cursor.value as string;
const cursorMessageId = pagination.cursor.messageCid;

query = query.where(({ eb, refTuple, tuple }) => {
const direction = sortDirection === SortDirection.Ascending ? '>' : '<';
// https://kysely-org.github.io/kysely-apidoc/interfaces/ExpressionBuilder.html#refTuple
return eb(refTuple(sortProperty, 'messageCid'), direction, cursor);
return eb(refTuple(sortProperty, 'messageCid'), direction, tuple(cursorValue, cursorMessageId));
});
}

Expand All @@ -232,11 +228,9 @@ export class MessageStoreSql implements MessageStore {
options?.signal
);

// extracts the full encoded message from the stored blob for each result item.
const messages: Promise<GenericMessage>[] = results.map((r:any) => this.parseEncodedMessage(r.encodedMessageBytes, r.encodedData, options));

// returns the pruned the messages, since we have and additional record from above, and a potential messageCid cursor
return this.getPaginationResults(messages, pagination?.limit);
// prunes the additional requested message, if it exists, and adds a cursor to the results.
// also parses the encoded message for each of the returned results.
return this.processPaginationResults(results, sortProperty, pagination?.limit, options);
}

async delete(
Expand Down Expand Up @@ -298,30 +292,41 @@ export class MessageStoreSql implements MessageStore {
}

/**
* Gets the pagination Message Cid if there are additional messages to paginate.
* Processes the paginated query results.
* Builds a pagination cursor if there are additional messages to paginate.
* Accepts more messages than the limit, as we query for additional records to check if we should paginate.
*
* @param messages a list of messages, potentially larger than the provided limit.
* @param limit the maximum number of messages to be returned
*
* @returns the pruned message results and an optional messageCid cursor
* @returns the pruned message results and an optional pagination cursor
*/
private async getPaginationResults(
messages: Promise<GenericMessage>[], limit?: number
): Promise<{ messages: GenericMessage[], cursor?: string }>{
if (limit !== undefined && messages.length > limit) {
messages = messages.slice(0, limit);
const lastMessage = messages.at(-1);
return {
messages : await Promise.all(messages),
cursor : lastMessage ? await Message.getCid(await lastMessage) : undefined
};
private async processPaginationResults(
results: any[],
sortProperty: string,
limit?: number,
options?: MessageStoreOptions,
): Promise<{ messages: GenericMessage[], cursor?: PaginationCursor}> {
// we queried for one additional message to determine if there are any additional messages beyond the limit
// we now check if the returned results are greater than the limit, if so we pluck the last item out of the result set
// the cursor is always the last item in the *returned* result so we use the last item in the remaining result set to build a cursor
let cursor: PaginationCursor | undefined;
if (limit !== undefined && results.length > limit) {
results = results.slice(0, limit);
const lastMessage = results.at(-1);
const cursorValue = lastMessage[sortProperty];
cursor = { messageCid: lastMessage.messageCid, value: cursorValue };
}

return { messages: await Promise.all(messages) };
// extracts the full encoded message from the stored blob for each result item.
const messages: Promise<GenericMessage>[] = results.map(r => this.parseEncodedMessage(r.encodedMessageBytes, r.encodedData, options));
return { messages: await Promise.all(messages), cursor };
}

private getOrderBy(
/**
* Extracts the appropriate sort property and direction given a MessageSort object.
*/
private extractSortProperties(
messageSort?: MessageSort
):{ property: 'dateCreated' | 'datePublished' | 'messageTimestamp', direction: SortDirection } {
if(messageSort?.dateCreated !== undefined) {
Expand Down