Skip to content

Commit

Permalink
Merge branch 'main' into attribute-array
Browse files Browse the repository at this point in the history
  • Loading branch information
colleenXu committed Oct 18, 2024
2 parents 3f6ac20 + f3a5165 commit 3ee617f
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 112 deletions.
11 changes: 3 additions & 8 deletions __test__/unittest/inferred_mode.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -557,14 +557,12 @@ describe('Test InferredQueryHandler', () => {
expect(report).toHaveProperty('querySuccess');
expect(report).toHaveProperty('queryHadResults');
expect(report).toHaveProperty('mergedResults');
expect(report).toHaveProperty('creativeLimitHit');

const { querySuccess, queryHadResults, mergedResults, creativeLimitHit } = report;
const { querySuccess, queryHadResults, mergedResults } = report;
expect(querySuccess).toBeTruthy();
expect(queryHadResults).toBeTruthy();
expect(Object.keys(mergedResults)).toHaveLength(2);
expect(Object.values(mergedResults)[0]).toEqual(1);
expect(creativeLimitHit).toBeTruthy();
expect(Object.keys(combinedResponse.message.results)).toHaveLength(3);
expect(combinedResponse.message.results['fakeCompound1-fakeDisease1'].analyses[0].score).toEqual(
0.7836531040612146,
Expand Down Expand Up @@ -730,13 +728,11 @@ describe('Test InferredQueryHandler', () => {
querySuccess: querySuccess1,
queryHadResults: queryHadResults1,
mergedResults: mergedResults1,
creativeLimitHit: creativeLimitHit1,
} = inferredQueryHandler.combineResponse(2, trapiQueryHandler1, qEdgeID, qEdge, combinedResponse, auxGraphSuffixes);

expect(querySuccess1).toBeTruthy();
expect(queryHadResults1).toBeTruthy();
expect(Object.keys(mergedResults1)).toHaveLength(1);
expect(creativeLimitHit1).toBeTruthy();
expect(combinedResponse.message.results['fakeCompound1-fakeDisease1'].analyses[0].score).toEqual(
0.7836531040612146,
);
Expand Down Expand Up @@ -975,6 +971,8 @@ describe('Test InferredQueryHandler', () => {
);
handler.CREATIVE_LIMIT = 1;

// @ts-expect-error don't need the whole thing
global.queryInformation = {};
const response = await handler.query();

expect(queryIsValid).toHaveBeenCalled();
Expand All @@ -992,9 +990,6 @@ describe('Test InferredQueryHandler', () => {
expect(response.message.knowledge_graph.nodes).toHaveProperty('creativeQueryObject');
expect(response.message.results[0].node_bindings).toHaveProperty('creativeQuerySubject');
expect(response.message.results[0].node_bindings).toHaveProperty('creativeQueryObject');
expect(response.logs.map((log) => log.message)).toContain(
'Addition of 1 results from Template 1 meets creative result maximum of 1 (reaching 1 merged). Response will be truncated to top-scoring 1 results. Skipping remaining 2 templates.',
);
});

test('supportedLookups', async () => {
Expand Down
16 changes: 9 additions & 7 deletions src/batch_edge_query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ export default class BatchEdgeQueryHandler {
/**
* @private
*/
async _queryAPIEdges(APIEdges: APIEdge[], unavailableAPIs: UnavailableAPITracker = {}): Promise<Record[]> {
async _queryAPIEdges(APIEdges: APIEdge[], unavailableAPIs: UnavailableAPITracker = {}, abortSignal?: AbortSignal): Promise<Record[]> {
const executor = new call_api(APIEdges, this.options, redisClient);
const records: Record[] = await executor.query(this.resolveOutputIDs, unavailableAPIs);
const records: Record[] = await executor.query(this.resolveOutputIDs, unavailableAPIs, abortSignal);
this.logs = [...this.logs, ...executor.logs];
return records;
}
Expand Down Expand Up @@ -123,18 +123,20 @@ export default class BatchEdgeQueryHandler {
});
}

async query(qEdges: QEdge | QEdge[], unavailableAPIs: UnavailableAPITracker = {}): Promise<Record[]> {
async query(qEdges: QEdge | QEdge[], unavailableAPIs: UnavailableAPITracker = {}, abortSignal?: AbortSignal): Promise<Record[]> {
debug('Node Update Start');
// it's now a single edge but convert to arr to simplify refactoring
qEdges = Array.isArray(qEdges) ? qEdges : [qEdges];
const nodeUpdate = new NodesUpdateHandler(qEdges);
// difference is there is no previous edge info anymore
await nodeUpdate.setEquivalentIDs(qEdges);
await nodeUpdate.setEquivalentIDs(qEdges, abortSignal);
await this._rmEquivalentDuplicates(qEdges);
debug('Node Update Success');

if (abortSignal?.aborted) return [];

const cacheHandler = new CacheHandler(this.caching, this.metaKG, this.options);
const { cachedRecords, nonCachedQEdges } = await cacheHandler.categorizeEdges(qEdges);
const { cachedRecords, nonCachedQEdges } = await cacheHandler.categorizeEdges(qEdges, abortSignal);
this.logs = [...this.logs, ...cacheHandler.logs];
let queryRecords: Record[];

Expand All @@ -154,8 +156,8 @@ export default class BatchEdgeQueryHandler {
}
const expanded_APIEdges = this._expandAPIEdges(APIEdges);
debug('Start to query APIEdges....');
queryRecords = await this._queryAPIEdges(expanded_APIEdges, unavailableAPIs);
if (queryRecords === undefined) return;
queryRecords = await this._queryAPIEdges(expanded_APIEdges, unavailableAPIs, abortSignal);
if (queryRecords === undefined || abortSignal?.aborted) return;
debug('APIEdges are successfully queried....');
queryRecords = await this._postQueryFilter(queryRecords);
debug(`Total number of records is (${queryRecords.length})`);
Expand Down
5 changes: 4 additions & 1 deletion src/cache_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ export default class CacheHandler {
);
}

async categorizeEdges(qEdges: QEdge[]): Promise<{ cachedRecords: Record[]; nonCachedQEdges: QEdge[] }> {
async categorizeEdges(qEdges: QEdge[], abortSignal?: AbortSignal): Promise<{ cachedRecords: Record[]; nonCachedQEdges: QEdge[] }> {
if (this.cacheEnabled === false || process.env.INTERNAL_DISABLE_REDIS === 'true') {
return {
cachedRecords: [],
Expand All @@ -123,13 +123,16 @@ export default class CacheHandler {
let cachedRecords: Record[] = [];
debug('Begin edge cache lookup...');
await async.eachSeries(qEdges, async (qEdge) => {
if (abortSignal?.aborted) return;
const qEdgeMetaKGHash = this._hashEdgeByMetaKG(qEdge.getHashedEdgeRepresentation());
const unpackedRecords: Record[] = await new Promise((resolve) => {
const redisID = 'bte:edgeCache:' + qEdgeMetaKGHash;
redisClient.client.usingLock([`redisLock:${redisID}`], 600000, async () => {
try {
const compressedRecordPack = await redisClient.client.hgetallTimeout(redisID);

if (abortSignal?.aborted) resolve([]);

if (compressedRecordPack && Object.keys(compressedRecordPack).length) {
const recordPack = [];

Expand Down
7 changes: 5 additions & 2 deletions src/edge_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ export default class QueryEdgeManager {

_createBatchQueryHandler(qEdge: QEdge, metaKG: MetaKG): BatchEdgeQueryHandler {
const handler = new BatchEdgeQueryHandler(metaKG, this.options.resolveOutputIDs, {
...this.options,
caching: this.options.caching,
submitter: this.options.submitter,
recordHashEdgeAttributes: config.EDGE_ATTRIBUTES_USED_IN_RECORD_HASH,
Expand Down Expand Up @@ -404,9 +405,11 @@ export default class QueryEdgeManager {
debug(logMessage);
}

async executeEdges(): Promise<boolean> {
async executeEdges(abortSignal?: AbortSignal): Promise<boolean> {
const unavailableAPIs: UnavailableAPITracker = {};
while (this.getEdgesNotExecuted()) {
if (abortSignal?.aborted) return false;

const span = Telemetry.startSpan({ description: 'edgeExecution' });
//next available/most efficient edge
const currentQEdge = this.getNext();
Expand All @@ -423,7 +426,7 @@ export default class QueryEdgeManager {
);
debug(`(5) Executing current edge >> "${currentQEdge.getID()}"`);
//execute current edge query
const queryRecords = await queryBatchHandler.query(queryBatchHandler.qEdges, unavailableAPIs);
const queryRecords = await queryBatchHandler.query(queryBatchHandler.qEdges, unavailableAPIs, abortSignal);
this.logs = [...this.logs, ...queryBatchHandler.logs];
if (queryRecords === undefined) return;
// create an edge execution summary
Expand Down
13 changes: 11 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ export default class TRAPIQueryHandler {
];
};

async query(): Promise<void> {
async query(abortSignal?: AbortSignal): Promise<void> {
this._initializeResponse();
await this.addQueryNodes();

Expand Down Expand Up @@ -769,12 +769,21 @@ export default class TRAPIQueryHandler {
}
const manager = new EdgeManager(queryEdges, metaKG, this.subclassEdges, this.options);

const executionSuccess = await manager.executeEdges();
let executionSuccess: boolean;
try {
executionSuccess = await manager.executeEdges(abortSignal);
} catch (error) {
// Make sure we preserve the logs we can
this.logs = [...this.logs, ...manager.logs]
throw error;
}
this.logs = [...this.logs, ...manager.logs];
if (!executionSuccess) {
return;
}

if (abortSignal?.aborted) return;

const span3 = Telemetry.startSpan({ description: 'resultsAssembly' });

// update query graph
Expand Down
Loading

0 comments on commit 3ee617f

Please sign in to comment.