Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add r/w lock on SmartAPI specs and fix all metakg endpoints #53

Merged
merged 8 commits into from
Nov 13, 2024
16 changes: 8 additions & 8 deletions __test__/integration/controllers/association.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,26 @@ import assoc from "../../../src/controllers/association";

describe("Test association module", () => {
test("By default, should return all associations", async () => {
const res = assoc();
const res = await assoc();
expect(res.length).toBeGreaterThan(10);
expect(res[0]).toHaveProperty("subject");
expect(res[0]).toHaveProperty("api");
});

test("If sub specified, should only return associations related to the sub", async () => {
const res = assoc("Gene");
const res = await assoc("Gene");
const inputTypes = new Set(res.map(item => item.subject));
expect(Array.from(inputTypes)).toHaveLength(1);
expect(Array.from(inputTypes)).toEqual(["Gene"]);
});

test("If invalid sub specified, should only empty list", async () => {
const res = assoc("Gene1");
const res = await assoc("Gene1");
expect(res).toEqual([]);
});

test("If obj specified, should only return associations related to the obj", async () => {
const res = assoc(undefined, "SmallMolecule");
const res = await assoc(undefined, "SmallMolecule");
const outputTypes = new Set(res.map(item => item.object));
const inputTypes = new Set(res.map(item => item.subject));
expect(inputTypes.size).toBeGreaterThan(1);
Expand All @@ -30,7 +30,7 @@ describe("Test association module", () => {
});

test("If pred specified, should only return associations related to the pred", async () => {
const res = assoc(undefined, undefined, "treats");
const res = await assoc(undefined, undefined, "treats");
const preds = new Set(res.map(item => item.predicate));
const inputTypes = new Set(res.map(item => item.subject));
expect(inputTypes.size).toBeGreaterThan(1);
Expand All @@ -39,7 +39,7 @@ describe("Test association module", () => {
});

test("If api specified, should only return associations related to the api", async () => {
const res = assoc(undefined, undefined, undefined, undefined, "MyGene.info API");
const res = await assoc(undefined, undefined, undefined, undefined, "MyGene.info API");
const apis = new Set(res.map(item => item.api.name));
const inputTypes = new Set(res.map(item => item.subject));
expect(inputTypes.size).toBeGreaterThan(1);
Expand All @@ -48,7 +48,7 @@ describe("Test association module", () => {
});

test("If source specified, should only return associations related to the source", async () => {
const res = assoc(undefined, undefined, undefined, undefined, undefined, "infores:disgenet");
const res = await assoc(undefined, undefined, undefined, undefined, undefined, "infores:disgenet");
const sources = new Set(res.map(item => item.provided_by));
const inputTypes = new Set(res.map(item => item.subject));
expect(inputTypes.size).toBeGreaterThan(1);
Expand All @@ -57,7 +57,7 @@ describe("Test association module", () => {
});

test("If both sub and obj specified, should only return associations related to both sub and obj", async () => {
const res = assoc("Gene", "SmallMolecule");
const res = await assoc("Gene", "SmallMolecule");
const outputTypes = new Set(res.map(item => item.object));
const inputTypes = new Set(res.map(item => item.subject));
expect(Array.from(inputTypes)).toHaveLength(1);
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
"node-cron": "^2.0.3",
"npm": "^9.9.0",
"piscina": "^3.2.0",
"proper-lockfile": "^4.1.2",
"ps-node": "^0.1.6",
"snake-case": "^3.0.4",
"stream-chunker": "^1.2.8",
Expand Down
8 changes: 4 additions & 4 deletions src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ export default class Config {
});
this.app.use("/", fastLimiter);
this.app.use("/v1/query", medLimiter);
this.app.use("/v1/team/:team_name/query", medLimiter);
this.app.use("/v1/team/:smartapiID/query", fastLimiter);
this.app.use("/v1/team/:teamName/query", medLimiter);
this.app.use("/v1/team/:smartAPIID/query", fastLimiter);
this.app.use("/v1/meta_knowledge_graph", fastLimiter);
this.app.use("/v1/team/:teamName/meta_knowledge_graph", fastLimiter);
this.app.use("/v1/smartapi/:smartapiID/meta_knowledge_graph", fastLimiter);
this.app.use("/v1/smartapi/:smartAPIID/meta_knowledge_graph", fastLimiter);
this.app.use("/v1/asyncquery", fastLimiter);
this.app.use("/v1/team/:teamName/asyncquery", fastLimiter);
this.app.use("/v1/smartapi/:smartapiID/asyncquery", fastLimiter);
this.app.use("/v1/smartapi/:smartAPIID/asyncquery", fastLimiter);
this.app.use("/queues", fastLimiter);
}

Expand Down
6 changes: 3 additions & 3 deletions src/controllers/association.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,21 @@ export interface AssocResult {
};
}

export default function (
export default async function (
sub: string = undefined,
obj: string = undefined,
pred: string = undefined,
component: string = undefined,
api: string = undefined,
source: string = undefined,
): AssocResult[] {
): Promise<AssocResult[]> {
const smartapi_specs = path.resolve(__dirname, "../../data/smartapi_specs.json");
debug(`smartapi specs loaded: ${smartapi_specs}`);
const predicates = path.resolve(__dirname, "../../data/predicates.json");
debug(`predicates endpoints loaded, ${predicates}`);
const kg = new meta_kg(smartapi_specs, predicates);
debug("metakg initialized");
kg.constructMetaKGSync(true, {});
await kg.constructMetaKGWithFileLock(true, {});
debug(`metakg loaded: ${kg.ops.length} ops`);
const associations: AssocResult[] = [];
const filtered_res = kg.filter({
Expand Down
2 changes: 1 addition & 1 deletion src/controllers/async/asyncquery_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export function getQueryQueue(name: string): BullQueue {
workflow: [
{
id:
job.data.route.includes(":smartapi_id") || job.data.route.includes(":team_name")
job.data.route.includes(":smartAPIID") || job.data.route.includes(":teamName")
? "lookup"
: "lookup_and_score",
},
Expand Down
68 changes: 58 additions & 10 deletions src/controllers/cron/update_local_smartapi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import SMARTAPI_EXCLUSIONS from "../../config/smartapi_exclusions";
import getSmartApiOverrideConfig from "../../config/smartapi_overrides";
import { SmartApiOverrides } from "../../types";
import apiList from "../../config/api_list";
import MetaKG, { SmartAPISpec } from "@biothings-explorer/smartapi-kg";
import { redisClient } from "@biothings-explorer/utils";
import MetaKG from "@biothings-explorer/smartapi-kg";
import { lockWithActionAsync, redisClient } from "@biothings-explorer/utils";
import { setTimeout } from "timers/promises";

const userAgent = `BTE/${process.env.NODE_ENV === "production" ? "prod" : "dev"} Node/${process.version} ${
process.platform
Expand Down Expand Up @@ -325,17 +326,42 @@ async function updateSmartAPISpecs() {
delete obj._score;
});

await fs.writeFile(localFilePath, JSON.stringify({ hits: hits }));
await lockWithActionAsync([localFilePath], async () => {
await fs.writeFile(localFilePath, JSON.stringify({ hits: hits }));
}, debug)

const predicatesInfo = await getOpsFromPredicatesEndpoints(res.data.hits);
await fs.writeFile(predicatesFilePath, JSON.stringify(predicatesInfo));
await lockWithActionAsync([predicatesFilePath], async () => {
await fs.writeFile(predicatesFilePath, JSON.stringify(predicatesInfo));
}, debug);

// Create a new metakg
const metakg = new MetaKG();
metakg.constructMetaKGSync(true, { predicates: predicatesInfo, smartapiSpecs: { hits: hits as any }, apiList });
await metakg.constructMetaKGWithFileLock(true, { predicates: predicatesInfo, smartapiSpecs: { hits: hits as any }, apiList });
global.metakg = metakg;
global.smartapi = { hits };
global.smartapi = { hits }; // hits is an array, but smartapi must be a dict
};

async function loadGlobalMetaKGReadOnly() {
await setTimeout(30000);
const localFilePath = path.resolve(__dirname, "../../../data/smartapi_specs.json");
const predicatesFilePath = path.resolve(__dirname, "../../../data/predicates.json");

const metakg = new MetaKG(localFilePath, predicatesFilePath);
await metakg.constructMetaKGWithFileLock(true, { apiList });
global.metakg = metakg;

global.smartapi = await lockWithActionAsync(
[localFilePath],
async () => {
const file = await fs.readFile(localFilePath, 'utf-8');
const hits = JSON.parse(file);
return hits;
},
debug
);
}

async function getAPIOverrides(data: { total?: number; hits: any }, overrides: SmartApiOverrides) {
// if only_overrides is enabled, only overridden apis are used
if (overrides.config.only_overrides) {
Expand Down Expand Up @@ -422,12 +448,35 @@ export default function manageSmartApi() {
process.env.INSTANCE_ID && process.env.INSTANCE_ID === "0", // Only one PM2 cluster instance should sync
].every(condition => condition);

/*
We schedule 2 cron jobs, one for non-syncing processes and one for the syncing process.
The non-syncing processes will only read from the local copy of the SmartAPI specs
after a 30 second timeout each time.
Whereas the syncing process will update the local copy of the SmartAPI specs.
We also run them once initially.
*/
if (!should_sync) {
debug(`SmartAPI sync disabled, server process ${process.pid} disabling smartapi updates.`);
debug(`Server process ${process.pid} disabling smartapi updates. SmartAPI files will be read from but not written to.`);
cron.schedule("*/10 * * * *", async () => {
debug(`Reading from SmartAPI specs now at ${new Date().toUTCString()}!`);
try {
await loadGlobalMetaKGReadOnly();
debug("Reading local copy of SmartAPI specs successful.");
} catch (err) {
debug(`Reading local copy of SmartAPI specs failed! The error message is ${err.toString()}`);
}
});

loadGlobalMetaKGReadOnly()
.then(() => {
debug("Reading local copy of SmartAPI specs successful.");
})
.catch(err => {
debug(`Reading local copy of SmartAPI specs failed! The error message is ${err.toString()}`);
});
return;
}

// Otherwise, schedule sync!
cron.schedule("*/10 * * * *", async () => {
debug(`Updating local copy of SmartAPI specs now at ${new Date().toUTCString()}!`);
try {
Expand All @@ -438,7 +487,6 @@ export default function manageSmartApi() {
}
});

// Run at start once
debug(`Running initial update of SmartAPI specs now at ${new Date().toUTCString()}`);
updateSmartAPISpecs()
.then(() => {
Expand All @@ -447,4 +495,4 @@ export default function manageSmartApi() {
.catch(err => {
debug(`Updating local copy of SmartAPI specs failed! The error message is ${err.toString()}`);
});
}
}
14 changes: 10 additions & 4 deletions src/controllers/meta_knowledge_graph.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import meta_kg, { KGQualifiersObject } from "@biothings-explorer/smartapi-kg";
import { snakeCase } from "snake-case";
import lockfile from "proper-lockfile";
import path from "path";
import PredicatesLoadingError from "../utils/errors/predicates_error";
const debug = require("debug")("bte:biothings-explorer-trapi:metakg");
import apiList from "../config/api_list";
import { supportedLookups } from "@biothings-explorer/query_graph_handler";
import MetaKG from "@biothings-explorer/smartapi-kg";

interface PredicateInfo {
predicate: string;
Expand All @@ -31,16 +33,17 @@ export default class MetaKnowledgeGraphHandler {
const smartapi_specs = path.resolve(__dirname, "../../data/smartapi_specs.json");
const predicates = path.resolve(__dirname, "../../data/predicates.json");
const kg = new meta_kg(smartapi_specs, predicates);

try {
if (smartAPIID !== undefined) {
debug(`Constructing with SmartAPI ID ${smartAPIID}`);
kg.constructMetaKGSync(false, { apiList, smartAPIID: smartAPIID });
await kg.constructMetaKGWithFileLock(false, { apiList, smartAPIID: smartAPIID });
} else if (teamName !== undefined) {
debug(`Constructing with team ${teamName}`);
kg.constructMetaKGSync(false, { apiList, teamName: teamName });
await kg.constructMetaKGWithFileLock(false, { apiList, teamName: teamName });
} else {
debug(`Constructing with default`);
kg.constructMetaKGSync(true, { apiList });
await kg.constructMetaKGWithFileLock(true, { apiList });
}
if (kg.ops.length === 0) {
debug(`Found 0 operations`);
Expand Down Expand Up @@ -86,10 +89,13 @@ export default class MetaKnowledgeGraphHandler {
}

async getKG(
metakg: MetaKG = undefined,
smartAPIID: string = this.smartAPIID,
teamName: string = this.teamName,
): Promise<{ nodes: {}; edges: any[] }> {
const kg = await this._loadMetaKG(smartAPIID, teamName);
// read metakg from files if not globally defined
const kg = metakg ?? await this._loadMetaKG(smartAPIID, teamName);

let knowledge_graph = {
nodes: {},
edges: [],
Expand Down
4 changes: 2 additions & 2 deletions src/controllers/threading/threadHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,11 @@ export async function runTask(req: Request, res: Response, route: string, useBul
route,
queryGraph: (req.body as TrapiQuery)?.message?.query_graph,
workflow: (req.body as TrapiQuery)?.workflow,
smartAPIID: req.params.smartAPIID,
teamName: req.params.teamName,
options: {
logLevel: (req.body as TrapiQuery).log_level || (req.query.log_level as string),
submitter: (req.body as TrapiQuery).submitter,
smartAPIID: req.params.smartapi_id,
teamName: req.params.team_name,
...req.query,
},
params: req.params,
Expand Down
4 changes: 2 additions & 2 deletions src/routes/bullboard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ class BullBoardPage {
}
const queues = {
"/v1/asynquery": getQueryQueue("bte_query_queue"),
"/v1/smartapi/{smartapi_id}/asyncquery": getQueryQueue("bte_query_queue_by_api"),
"/v1/team/{team_name}/asyncquery": getQueryQueue("bte_query_queue_by_team"),
"/v1/smartapi/{smartAPIID}/asyncquery": getQueryQueue("bte_query_queue_by_api"),
"/v1/team/{teamName}/asyncquery": getQueryQueue("bte_query_queue_by_team"),
"/v1/query": getQueryQueue("bte_sync_query_queue"),
};

Expand Down
4 changes: 2 additions & 2 deletions src/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ export const tasks: TaskByRoute = {
asyncquery_v1_by_team: (taskInfo: TaskInfo) => V1AsyncQueryByTeam.task(taskInfo),
// load MetaKG from global
meta_knowledge_graph_v1: (taskInfo: TaskInfo) => V1MetaKG.task(taskInfo),
meta_knowledge_graph_v1_by_team: (taskInfo: TaskInfo) => V1MetaKGByTeam.task(taskInfo),
meta_knowledge_graph_v1_by_api: (taskInfo: TaskInfo) => V1MetaKGByAPI.task(taskInfo),
// Not threaded due to being lightweight/speed being higher priority
// performance: (taskInfo: TaskInfo) => Performance.task(taskInfo),
// metakg: (taskInfo: TaskInfo) => MetaKG.task(taskInfo),
// meta_knowledge_graph_v1_by_api: (taskInfo: TaskInfo) => V1MetaKGByAPI.task(taskInfo),
// meta_knowledge_graph_v1_by_team: (taskInfo: TaskInfo) => V1MetaKGByTeam.task(taskInfo),
};
2 changes: 1 addition & 1 deletion src/routes/metakg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class MetaKG {
if (req.query.provided_by !== undefined) {
source = utils.removeQuotesFromQuery(req.query.provided_by as string);
}
const assocs = assoc(
const assocs = await assoc(
req.query.subject as string,
req.query.object as string,
req.query.predicate as string,
Expand Down
4 changes: 2 additions & 2 deletions src/routes/v1/asyncquery_v1_by_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import { BteRoute } from "../../types";
class V1AsyncQueryByAPI implements BteRoute {
setRoutes(app: Express) {
app
.route("/v1/smartapi/:smartapi_id/asyncquery")
.route("/v1/smartapi/:smartAPIID/asyncquery")
.post(swaggerValidation.validate, async (req: Request, res: Response, next: NextFunction) => {
const queueData: QueueData = {
route: req.route.path,
queryGraph: req.body?.message.query_graph,
smartAPIID: req.params.smartapi_id,
smartAPIID: req.params.smartAPIID,
workflow: req.body?.workflow,
callback_url: req.body?.callback,
options: {
Expand Down
4 changes: 2 additions & 2 deletions src/routes/v1/asyncquery_v1_by_team.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import { QueueData, TaskInfo } from "@biothings-explorer/types";
class V1AsyncQueryByTeam implements BteRoute {
setRoutes(app: Express) {
app
.route("/v1/team/:team_name/asyncquery")
.route("/v1/team/:teamName/asyncquery")
.post(swaggerValidation.validate, (async (req: Request, res: Response, next: NextFunction) => {
const queueData: QueueData = {
route: req.route.path,
queryGraph: req.body?.message.query_graph,
teamName: req.params.team_name,
teamName: req.params.teamName,
workflow: req.body?.workflow,
callback_url: req.body?.callback,
options: {
Expand Down
17 changes: 8 additions & 9 deletions src/routes/v1/meta_knowledge_graph_v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import * as utils from "../../utils/common";
import { runTask, taskResponse, taskError } from "../../controllers/threading/threadHandler";
import { Express, NextFunction, Request, Response, RequestHandler } from "express";

import MetaKnowledgeGraph from "@biothings-explorer/smartapi-kg";

class MetaKG {
setRoutes(app: Express) {
app
Expand All @@ -23,15 +25,12 @@ class MetaKG {

async task(taskInfo: TaskInfo) {
try {
let kg = undefined;

// read metakg from files if not globally defined
if(!taskInfo.data.options.metakg) {
const metaKGHandler = new handler(undefined);
kg = await metaKGHandler.getKG();
} else {
kg = taskInfo.data.options.metakg;
}
const metaKGHandler = new handler(undefined);
let metakg = undefined;
// initialize MetaKG only if ops are provided because handler logic is built upon that
if (taskInfo.data.options.metakg !== undefined)
metakg = new MetaKnowledgeGraph(undefined, undefined, taskInfo.data.options.metakg);
const kg = await metaKGHandler.getKG(metakg);
// response.logs = utils.filterForLogLevel(response.logs, options.logLevel);
return taskResponse(kg);
} catch (error) {
Expand Down
Loading
Loading