Skip to content

Commit

Permalink
feat(core): query results ordering on custom fields (#2864)
Browse files Browse the repository at this point in the history
  • Loading branch information
PaulLeCam authored Aug 4, 2023
1 parent 832b97a commit d56a13f
Show file tree
Hide file tree
Showing 5 changed files with 392 additions and 84 deletions.
1 change: 1 addition & 0 deletions packages/cli/src/daemon/collection-queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export function collectionQuery(query: Record<string, any>): BaseQuery & Paginat
account: query.account,
filter: query.filter,
queryFilters: query.queryFilters,
sorting: query.sorting,
...pagination,
}
} catch (e) {
Expand Down
5 changes: 3 additions & 2 deletions packages/common/src/index-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ export type RangeValueFilter<T extends string | number> =
/**
* String or number field value filter
*/
export type ScalarValueFilter<T extends string | number = string | number> = CommonValueFilter<T> &
RangeValueFilter<T>
export type ScalarValueFilter<T extends string | number = string | number> =
| CommonValueFilter<T>
| RangeValueFilter<T>

/**
* Any supported field value filter on an object
Expand Down
239 changes: 160 additions & 79 deletions packages/core/src/indexing/insertion-order.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { type Knex } from 'knex'
import * as uint8arrays from 'uint8arrays'
import { StreamID } from '@ceramicnetwork/streamid'
import type { BaseQuery, Page, Pagination } from '@ceramicnetwork/common'
import type { BaseQuery, Page, Pagination, SortOrder, Sorting } from '@ceramicnetwork/common'
import {
BackwardPaginationQuery,
ForwardPaginationQuery,
Expand All @@ -11,13 +11,25 @@ import {
import { asTableName } from './as-table-name.util.js'
import { UnsupportedOrderingError } from './unsupported-ordering-error.js'
import { addColumnPrefix } from './column-name.util.js'
import { convertQueryFilter, DATA_FIELD } from './query-filter-converter.js'
import { contentKey, convertQueryFilter, DATA_FIELD } from './query-filter-converter.js'
import { parseQueryFilters } from './query-filter-parser.js'

type SelectedRequired = { stream_id: string; last_anchored_at: number; created_at: number }
type SelectedOptional = Record<string, boolean | number | string>
type Selected = SelectedRequired & SelectedOptional
type QueryFunc = (bldr: Knex.QueryBuilder<any, any>) => Knex.QueryBuilder<any, any>
type StreamContent = Record<string, unknown>
type QueryResult = {
stream_id: string
last_anchored_at?: number
created_at: number
stream_content: string
}
type QueryBuilder = Knex.QueryBuilder<unknown, Array<QueryResult>>

/**
* Stream `id` is always present in cursor, with the `value` either a record of content keys and values (if custom ordering is provided) or the `created_at` field value as fallback, based on the `type` value
*/
type CursorData<Content extends StreamContent = StreamContent> = { id: string } & (
| { type: 'timestamp'; value: number }
| { type: 'content'; value: Content }
)

/**
* Contains functions to transform (parse and stringify) GraphQL cursors
Expand All @@ -28,45 +40,69 @@ type QueryFunc = (bldr: Knex.QueryBuilder<any, any>) => Knex.QueryBuilder<any, a
abstract class Cursor {
/**
* Decode cursor from base64url as JSON.
* Return `undefined` if +cursor+ is `undefined` or `null`.
*/
static parse(cursor: string): any {
return JSON.parse(uint8arrays.toString(uint8arrays.fromString(cursor, 'base64url')))
static parse<Content extends StreamContent = StreamContent>(
cursor: string | undefined
): CursorData<Content> {
return cursor
? JSON.parse(uint8arrays.toString(uint8arrays.fromString(cursor, 'base64url')))
: undefined
}

/**
* base64url-encode cursor from +input+ object.
* Return `undefined` if +input+ object is `undefined` or `null`.
*/
static stringify(input: any): string | undefined {
if (input) {
return uint8arrays.toString(uint8arrays.fromString(JSON.stringify(input)), 'base64url')
} else {
static stringify(
input: QueryResult | undefined,
orderByKeys: Array<string> = []
): string | undefined {
if (input == null) {
return undefined
}
}
}

/**
* Prepare insertion cursor.
*/
function asInsertionCursor(input: { created_at: number } | undefined) {
if (!input) return undefined
return { created_at: input.created_at }
let cursor: CursorData
if (orderByKeys.length === 0) {
// Use `created_at` field
cursor = { type: 'timestamp', id: input.stream_id, value: input.created_at }
} else {
// Use custom content fields
const content =
typeof input.stream_content === 'string'
? JSON.parse(input.stream_content)
: input.stream_content
cursor = { type: 'content', id: input.stream_id, value: {} }
for (const key of orderByKeys) {
if (content[key] != null) {
cursor.value[key] = content[key]
}
}
}

return uint8arrays.toString(uint8arrays.fromString(JSON.stringify(cursor)), 'base64url')
}
}

const REVERSE_ORDER = {
const REVERSE_ORDER: Record<SortOrder, SortOrder> = {
ASC: 'DESC',
DESC: 'ASC',
}

type ComparisonSign = '>' | '<'

function getComparisonSign(order: SortOrder = 'ASC', reverse = false): ComparisonSign {
return order === 'ASC' ? (reverse ? '<' : '>') : reverse ? '>' : '<'
}

/**
* Reverse ASC to DESC, and DESC to ASC in an order clause.
*/
function reverseOrder<T extends { order: string }>(entries: Array<T>): Array<T> {
function reverseOrder<T extends { order: SortOrder }>(entries: Array<T>): Array<T> {
return entries.map((entry) => ({ ...entry, order: REVERSE_ORDER[entry.order] }))
}

const INSERTION_ORDER = [{ column: 'created_at', order: 'ASC' }]
const INSERTION_ORDER = [{ column: 'created_at', order: 'ASC' as const }]

/**
* Insertion order: created_at DESC.
Expand All @@ -75,48 +111,49 @@ export class InsertionOrder {
constructor(private readonly dbConnection: Knex) {}

async page(query: BaseQuery & Pagination): Promise<Page<StreamID>> {
const orderByKeys = Object.keys(query.sorting ?? {})
const pagination = parsePagination(query)
const paginationKind = pagination.kind
switch (paginationKind) {
case PaginationKind.FORWARD: {
const limit = pagination.first
const response: Array<Selected> = await this.forwardQuery(query, pagination)
const response = await this.forwardQuery(query, pagination)
const entries = response.slice(0, limit)
const firstEntry = entries[0]
const lastEntry = entries[entries.length - 1]
return {
edges: entries.map((row) => {
return {
cursor: Cursor.stringify(row),
cursor: Cursor.stringify(row, orderByKeys),
node: StreamID.fromString(row.stream_id),
}
}),
pageInfo: {
hasNextPage: response.length > limit,
hasPreviousPage: false,
endCursor: Cursor.stringify(asInsertionCursor(lastEntry)),
startCursor: Cursor.stringify(asInsertionCursor(firstEntry)),
endCursor: Cursor.stringify(lastEntry, orderByKeys),
startCursor: Cursor.stringify(firstEntry, orderByKeys),
},
}
}
case PaginationKind.BACKWARD: {
const limit = pagination.last
const response: Array<Selected> = await this.backwardQuery(query, pagination)
const response = await this.backwardQuery(query, pagination)
const entries = response.slice(-limit)
const firstEntry = entries[0]
const lastEntry = entries[entries.length - 1]
return {
edges: entries.map((row) => {
return {
cursor: Cursor.stringify(row),
cursor: Cursor.stringify(row, orderByKeys),
node: StreamID.fromString(row.stream_id),
}
}),
pageInfo: {
hasNextPage: false,
hasPreviousPage: response.length > limit,
endCursor: Cursor.stringify(asInsertionCursor(lastEntry)),
startCursor: Cursor.stringify(asInsertionCursor(firstEntry)),
endCursor: Cursor.stringify(lastEntry, orderByKeys),
startCursor: Cursor.stringify(firstEntry, orderByKeys),
},
}
}
Expand All @@ -128,72 +165,116 @@ export class InsertionOrder {
/**
* Forward query: traverse from the most recent to the last.
*/
private forwardQuery(
private async forwardQuery(
query: BaseQuery,
pagination: ForwardPaginationQuery
): Knex.QueryBuilder<unknown, Array<Selected>> {
const tableName = asTableName(query.model)
const queryFunc = this.query(query, false)
let base = queryFunc(this.dbConnection.from(tableName)).limit(pagination.first + 1)
if (pagination.after) {
const after = Cursor.parse(pagination.after)
base = base.where('created_at', '>', after.created_at)
}
return base
): Promise<Array<QueryResult>> {
return await this.query(query, false, Cursor.parse(pagination.after)).limit(
pagination.first + 1
)
}

/**
* Backward query: traverse from the last to the most recent.
*/
private backwardQuery(
private async backwardQuery(
query: BaseQuery,
pagination: BackwardPaginationQuery
): Knex.QueryBuilder<unknown, Array<Selected>> {
const tableName = asTableName(query.model)
const queryFunc = this.query(query, true)
return this.dbConnection
.select('*')
.from((bldr) => {
let subquery = queryFunc(bldr.from(tableName)).limit(pagination.last + 1)
if (pagination.before) {
const before = Cursor.parse(pagination.before)
subquery = subquery.where('created_at', '<', before.created_at)
}
return subquery.as('T')
})
.orderBy(INSERTION_ORDER)
): Promise<Array<QueryResult>> {
const response = await this.query(query, true, Cursor.parse(pagination.before)).limit(
pagination.last + 1
)
// Reverse response as results are returned in descending order
response.reverse()
return response
}

private query(query: BaseQuery, isReverseOrder: boolean): QueryFunc {
let converted = null
if (query.queryFilters) {
const parsed = parseQueryFilters(query.queryFilters)
converted = convertQueryFilter(parsed)
private query(query: BaseQuery, isReverseOrder: boolean, cursor?: CursorData): QueryBuilder {
let builder: QueryBuilder = this.dbConnection
.from(asTableName(query.model))
.columns(['stream_id', 'last_anchored_at', 'created_at', DATA_FIELD])
.select()
// Handle filters (account, fields and/or legacy relations)
builder = this.applyFilters(builder, query)
const sorting = query.sorting ?? {}
// Handle cursor if present
if (cursor != null) {
builder = this.applyCursor(builder, cursor, isReverseOrder, sorting)
}
return (bldr) => {
let base = bldr.columns(['stream_id', 'last_anchored_at', 'created_at', DATA_FIELD]).select()
// Handle ordering
builder = this.applySorting(builder, isReverseOrder, sorting)
return builder
}

if (converted) {
base = base.where(converted.where)
}
private applyFilters(builder: QueryBuilder, query: BaseQuery): QueryBuilder {
if (query.account) {
builder = builder.where({ controller_did: query.account })
}

if (isReverseOrder) {
base = base.orderBy(reverseOrder(INSERTION_ORDER))
} else {
base = base.orderBy(INSERTION_ORDER)
if (query.queryFilters) {
const parsed = parseQueryFilters(query.queryFilters)
const converted = convertQueryFilter(parsed)
if (converted) {
builder = builder.where(converted.where)
}
if (query.account) {
base = base.where({ controller_did: query.account })
} else if (query.filter) {
// Handle legacy `filter` object used for relations
for (const [key, value] of Object.entries(query.filter)) {
const filterObj = {}
filterObj[addColumnPrefix(key)] = value
builder = builder.andWhere(filterObj)
}
if (query.filter) {
for (const [key, value] of Object.entries(query.filter)) {
const filterObj = {}
filterObj[addColumnPrefix(key)] = value
base = base.andWhere(filterObj)
}
}

return builder
}

private applyCursor(
builder: QueryBuilder,
cursor: CursorData,
isReverseOrder: boolean,
sorting: Sorting
): QueryBuilder {
if (cursor.type === 'timestamp') {
// Paginate using the `created_at` field when no custom field ordering is provided
builder = builder.where((qb) => {
qb.where('created_at', isReverseOrder ? '<' : '>', cursor.value) // strict next value
.orWhere('created_at', '=', cursor.value) // or current value
.andWhere('stream_id', '>', cursor.id) // with stream ID tie-breaker
})
} else {
// Paginate using previous values of custom fields
for (const [key, value] of Object.entries(cursor.value)) {
const field = contentKey(key)
const sign = getComparisonSign(sorting[key], isReverseOrder)
builder = builder.where((qb) => {
qb.whereRaw(`${field} ${sign} ?`, [value]) // strict next value
.orWhereRaw(`${field} = ?`, [value]) // or current value
.andWhere('stream_id', '>', cursor.id) // with stream ID tie-breaker
})
}
}
return builder
}

return base
private applySorting(
builder: QueryBuilder,
isReverseOrder: boolean,
sorting: Sorting
): QueryBuilder {
const sortingEntries = Object.entries(sorting ?? {})
if (sortingEntries.length === 0) {
// Order by insertion order (`created_at` field) as fallback
builder = builder.orderBy(isReverseOrder ? reverseOrder(INSERTION_ORDER) : INSERTION_ORDER)
} else {
// Order by custom fields
for (const [field, order] of sortingEntries) {
const orderBy = isReverseOrder ? REVERSE_ORDER[order] : order
builder = builder.orderByRaw(`${contentKey(field)} ${orderBy}`)
}
}
// Always order by stream ID as tie-breaker
builder = builder.orderBy('stream_id', 'asc')
return builder
}
}
6 changes: 5 additions & 1 deletion packages/core/src/indexing/query-filter-converter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import {

export const DATA_FIELD = 'stream_content'

export function contentKey(field: string): string {
return `${DATA_FIELD}->>'${field}'`
}

type DBQuery = Knex.QueryBuilder
type WhereFunc = (DBQuery) => DBQuery

Expand Down Expand Up @@ -111,7 +115,7 @@ function handleWhereQuery(state: ConversionState<ObjectFilter>): ConvertedQueryF
for (const filterKey in state.filter) {
select.push(filterKey)
const value = state.filter[filterKey]
const key = `${DATA_FIELD}->>'${filterKey}'`
const key = contentKey(filterKey)

switch (value.op) {
case 'null': {
Expand Down
Loading

0 comments on commit d56a13f

Please sign in to comment.