Skip to content

Commit

Permalink
Change how pagination works
Browse files Browse the repository at this point in the history
We need to paginate once we fetch the 'flows' entities because is the only place where we can filter those flows which 'deletedAt' property is null
Updated how the query for large dataset works - now we use a recursive function
Updated how the count query was map into proper result
  • Loading branch information
manelcecs committed Apr 11, 2024
1 parent a7ec1ed commit 3dcf1ea
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 54 deletions.
6 changes: 5 additions & 1 deletion src/domain-services/flows/flow-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,15 @@ export class FlowService {
// and since we are filtering and paginating it makes no sense to fetch all the data
// just to do the count - this approach is faster and more efficient

return await databaseConnection
const query = databaseConnection
.queryBuilder()
.count('*')
.from('flow')
.where(prepareCondition(conditions));

const [{ count }] = await query;

return Number(count);
}

async getFlowIDsFromEntity(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Cond } from '@unocha/hpc-api-core/src/db/util/conditions';
import { Cond, Op } from '@unocha/hpc-api-core/src/db/util/conditions';
import { Service } from 'typedi';
import { FlowService } from '../../flow-service';
import {
Expand All @@ -7,7 +7,6 @@ import {
type FlowSearchStrategyResponse,
} from '../flow-search-strategy';
import {
mapCountResultToCountObject,
mapFlowOrderBy,
prepareFlowConditions,
prepareFlowStatusConditions,
Expand All @@ -34,26 +33,34 @@ export class OnlyFlowFiltersStrategy implements FlowSearchStrategy {
flowConditions = prepareFlowStatusConditions(flowConditions, statusFilter);

// Build conditions object
const searchConditions = {
[Cond.AND]: [flowConditions ?? {}],
// We need to add the condition to filter the deletedAt field
const whereClause = {
[Cond.AND]: [
{
deletedAt: {
[Op.IS_NULL]: true,
},
},
flowConditions ?? {},
],
};

const orderByFlow = mapFlowOrderBy(orderBy);

const [flows, countRes] = await Promise.all([
this.flowService.getFlows({
models,
conditions: searchConditions,
conditions: whereClause,
offset,
orderBy: orderByFlow,
limit,
}),
this.flowService.getFlowsCount(databaseConnection, flowConditions),
this.flowService.getFlowsCount(databaseConnection, whereClause),
]);

// Map count result query to count object
const countObject = mapCountResultToCountObject(countRes);
const countObject = countRes;

return { flows, count: countObject.count };
return { flows, count: countObject };
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { type Database } from '@unocha/hpc-api-core/src/db';
import { Cond } from '@unocha/hpc-api-core/src/db/util/conditions';
import { Service } from 'typedi';
import { FlowService } from '../../flow-service';
Expand Down Expand Up @@ -29,6 +30,8 @@ export class SearchFlowByFiltersStrategy implements FlowSearchStrategy {
private readonly getFlowIdsFromNestedFlowFilters: GetFlowIdsFromNestedFlowFiltersStrategyImpl
) {}

private readonly MAX_DATABASE_CHUNK_SIZE = 5000;

async search(args: FlowSearchArgs): Promise<FlowSearchStrategyResponse> {
const {
models,
Expand Down Expand Up @@ -230,13 +233,6 @@ export class SearchFlowByFiltersStrategy implements FlowSearchStrategy {
flowIDsFromNestedFlowFilters
);

// Obtain the count of the flows that match the filters
const count = deduplicatedFlows.length;

if (count === 0) {
return { flows: [], count: 0 };
}

// After obtaining the count, we need to obtain the flows
// that match the filters
// First we are going to sort the deduplicated flows
Expand All @@ -246,57 +242,130 @@ export class SearchFlowByFiltersStrategy implements FlowSearchStrategy {
sortByFlowIDs
);

// Then we are going to slice the flows using the limit and offset
let reducedFlows: UniqueFlowEntity[];
if (offset !== undefined && limit !== undefined) {
reducedFlows = sortedFlows.slice(offset, offset + limit);
} else {
reducedFlows = sortedFlows;
}
// Store the count
const count = sortedFlows.length;

let flows: FlowEntity[] = [];
const chunkSize = 5000;
if (reducedFlows.length > chunkSize) {
// This should only happen when doing the totalAmountSearch
if (
limit === undefined &&
offset === undefined &&
sortedFlows.length > this.MAX_DATABASE_CHUNK_SIZE
) {
// We need to paginate over the searchConditions
// Then collect the flows

// 1. Generate an array with the chunkSize
for (let i = 0; i < reducedFlows.length; i += chunkSize) {
const chunk = reducedFlows.slice(i, i + chunkSize);
// 2. Generate the searchConditions
// And collect the flows
const chunkSearchConditions = this.buildConditions(chunk, flowFilters);
const flowsChunk = await this.flowService.getFlows({
models,
conditions: chunkSearchConditions,
orderBy: orderByForFlow,
});
flows.push(...flowsChunk);
}
flows = await this.progresiveChunkSearch(
models,
sortedFlows,
0,
this.MAX_DATABASE_CHUNK_SIZE,
[]
);
} else {
// Once the list of elements is reduced, we need to build the conditions
const searchConditions = this.buildConditions(reducedFlows);
flows = await this.flowService.getFlows({
// Store the flows promise
const flowsResponse = await this.progresiveSearch(
models,
conditions: searchConditions,
orderBy: orderByForFlow,
});
sortedFlows,
limit!,
offset ?? 0,
[]
);

// Store the flows
flows = flowsResponse;
}
return { flows, count };
}

async progresiveSearch(
models: Database,
sortedFlows: UniqueFlowEntity[],
limit: number,
offset: number,
flowResponse: FlowEntity[]
): Promise<FlowEntity[]> {
const reducedFlows = sortedFlows.slice(offset, offset + limit);

const whereConditions = this.buildConditions(reducedFlows);

const flows = await this.flowService.getFlows({
models,
conditions: whereConditions,
});

flowResponse.push(...flows);

if (flowResponse.length === limit || reducedFlows.length < limit) {
return flowResponse;
}

// Recursive call
offset += limit;
return await this.progresiveSearch(
models,
sortedFlows,
limit,
offset,
flowResponse
);
}

async progresiveChunkSearch(
models: Database,
sortedFlows: UniqueFlowEntity[],
start: number,
chunkSize: number,
flowResponse: FlowEntity[]
): Promise<FlowEntity[]> {
const reducedFlows = sortedFlows.slice(start, start + chunkSize);

const whereConditions = this.buildConditions(reducedFlows);

const flows = await this.flowService.getFlows({
models,
conditions: whereConditions,
});

flowResponse.push(...flows);

if (
flowResponse.length === sortedFlows.length ||
reducedFlows.length < chunkSize
) {
return flowResponse;
}

// Recursive call
start += chunkSize;
return await this.progresiveSearch(
models,
sortedFlows,
start,
chunkSize,
flowResponse
);
}

buildConditions(
uniqueFlowEntities: UniqueFlowEntity[],
flowFilters?: SearchFlowsFilters
): any {
const whereClauses = uniqueFlowEntities.map((flow) => ({
[Cond.AND]: [{ id: flow.id }, { versionID: flow.versionID }],
[Cond.AND]: [
{ id: flow.id },
{ versionID: flow.versionID },
{ deletedAt: null },
],
}));

if (flowFilters) {
const flowConditions = prepareFlowConditions(flowFilters);
return {
[Cond.AND]: [flowConditions, { [Cond.OR]: whereClauses }],
[Cond.AND]: [
{ deletedAt: null },
flowConditions,
{ [Cond.OR]: whereClauses },
],
};
}

Expand Down
7 changes: 0 additions & 7 deletions src/domain-services/flows/strategy/impl/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,6 @@ export function prepareFlowConditions(flowFilters: SearchFlowsFilters): any {
return flowConditions;
}

export function mapCountResultToCountObject(countRes: any[]) {
// Map count result query to count object
const countObject = countRes[0] as { count: number };

return countObject;
}

export function mergeUniqueEntities(
listA: UniqueFlowEntity[],
listB: UniqueFlowEntity[]
Expand Down

0 comments on commit 3dcf1ea

Please sign in to comment.