diff --git a/package.json b/package.json index 3d50019..bebfd9e 100644 --- a/package.json +++ b/package.json @@ -114,6 +114,7 @@ "node-cron": "^2.0.3", "npm": "^9.9.0", "piscina": "^3.2.0", + "proper-lockfile": "^4.1.2", "ps-node": "^0.1.6", "snake-case": "^3.0.4", "stream-chunker": "^1.2.8", diff --git a/src/controllers/cron/update_local_smartapi.ts b/src/controllers/cron/update_local_smartapi.ts index 502e806..7294c8f 100644 --- a/src/controllers/cron/update_local_smartapi.ts +++ b/src/controllers/cron/update_local_smartapi.ts @@ -14,6 +14,7 @@ import { SmartApiOverrides } from "../../types"; import apiList from "../../config/api_list"; import MetaKG, { SmartAPISpec } from "@biothings-explorer/smartapi-kg"; import { redisClient } from "@biothings-explorer/utils"; +import { writeFileWithLock } from "../../utils/common"; const userAgent = `BTE/${process.env.NODE_ENV === "production" ? "prod" : "dev"} Node/${process.version} ${ process.platform @@ -325,9 +326,9 @@ async function updateSmartAPISpecs() { delete obj._score; }); - await fs.writeFile(localFilePath, JSON.stringify({ hits: hits })); + await writeFileWithLock(localFilePath, JSON.stringify({ hits: hits })); const predicatesInfo = await getOpsFromPredicatesEndpoints(res.data.hits); - await fs.writeFile(predicatesFilePath, JSON.stringify(predicatesInfo)); + await writeFileWithLock(predicatesFilePath, JSON.stringify(predicatesInfo)); // Create a new metakg const metakg = new MetaKG(); diff --git a/src/controllers/meta_knowledge_graph.ts b/src/controllers/meta_knowledge_graph.ts index 7affdc5..e61838b 100644 --- a/src/controllers/meta_knowledge_graph.ts +++ b/src/controllers/meta_knowledge_graph.ts @@ -1,10 +1,12 @@ import meta_kg, { KGQualifiersObject } from "@biothings-explorer/smartapi-kg"; import { snakeCase } from "snake-case"; +import lockfile from "proper-lockfile"; import path from "path"; import PredicatesLoadingError from "../utils/errors/predicates_error"; const debug = require("debug")("bte:biothings-explorer-trapi:metakg"); import apiList from "../config/api_list"; import { supportedLookups } from "@biothings-explorer/query_graph_handler"; +import MetaKG from "@biothings-explorer/smartapi-kg"; interface PredicateInfo { predicate: string; @@ -31,25 +33,50 @@ export default class MetaKnowledgeGraphHandler { const smartapi_specs = path.resolve(__dirname, "../../data/smartapi_specs.json"); const predicates = path.resolve(__dirname, "../../data/predicates.json"); const kg = new meta_kg(smartapi_specs, predicates); + try { - if (smartAPIID !== undefined) { - debug(`Constructing with SmartAPI ID ${smartAPIID}`); - kg.constructMetaKGSync(false, { apiList, smartAPIID: smartAPIID }); - } else if (teamName !== undefined) { - debug(`Constructing with team ${teamName}`); - kg.constructMetaKGSync(false, { apiList, teamName: teamName }); - } else { - debug(`Constructing with default`); - kg.constructMetaKGSync(true, { apiList }); - } - if (kg.ops.length === 0) { - debug(`Found 0 operations`); - throw new PredicatesLoadingError("Not Found - 0 operations"); + // obtain exclusive lock to avoid cron job updating the file + // NOTE: we trade off some read parallelism for consistency here + const release = await lockfile.lock(smartapi_specs, { + retries: { + retries: 10, + factor: 2, + minTimeout: 100, + maxTimeout: 1000, + }, + stale: 5000, + }); + + try { + if (smartAPIID !== undefined) { + debug(`Constructing with SmartAPI ID ${smartAPIID}`); + kg.constructMetaKGSync(false, { apiList, smartAPIID: smartAPIID }); + } else if (teamName !== undefined) { + debug(`Constructing with team ${teamName}`); + kg.constructMetaKGSync(false, { apiList, teamName: teamName }); + } else { + debug(`Constructing with default`); + kg.constructMetaKGSync(true, { apiList }); + } + if (kg.ops.length === 0) { + debug(`Found 0 operations`); + throw new PredicatesLoadingError("Not Found - 0 operations"); + } + return kg; + } catch (error) { + debug(`ERROR getting graph with ID:${smartAPIID} team:${teamName} because ${error}`); + throw new PredicatesLoadingError(`Failed to Load MetaKG: ${error}`); + } finally { + await release(); } - return kg; } catch (error) { - debug(`ERROR getting graph with ID:${smartAPIID} team:${teamName} because ${error}`); - throw new PredicatesLoadingError(`Failed to Load MetaKG: ${error}`); + if (error instanceof PredicatesLoadingError) { + throw error; + } + else { + debug(`ERROR locking file because ${error}.`); + throw new PredicatesLoadingError(`Failed to Lock File: ${error}`); + } } } @@ -86,10 +113,13 @@ export default class MetaKnowledgeGraphHandler { } async getKG( + metakg: MetaKG = undefined, smartAPIID: string = this.smartAPIID, teamName: string = this.teamName, ): Promise<{ nodes: {}; edges: any[] }> { - const kg = await this._loadMetaKG(smartAPIID, teamName); + // read metakg from files if not globally defined + const kg = metakg ?? await this._loadMetaKG(smartAPIID, teamName); + let knowledge_graph = { nodes: {}, edges: [], diff --git a/src/routes/v1/meta_knowledge_graph_v1.ts b/src/routes/v1/meta_knowledge_graph_v1.ts index 947e98c..204729a 100644 --- a/src/routes/v1/meta_knowledge_graph_v1.ts +++ b/src/routes/v1/meta_knowledge_graph_v1.ts @@ -5,6 +5,8 @@ import * as utils from "../../utils/common"; import { runTask, taskResponse, taskError } from "../../controllers/threading/threadHandler"; import { Express, NextFunction, Request, Response, RequestHandler } from "express"; +import MetaKnowledgeGraph from "@biothings-explorer/smartapi-kg"; + class MetaKG { setRoutes(app: Express) { app @@ -23,15 +25,12 @@ class MetaKG { async task(taskInfo: TaskInfo) { try { - let kg = undefined; - - // read metakg from files if not globally defined - if(!taskInfo.data.options.metakg) { - const metaKGHandler = new handler(undefined); - kg = await metaKGHandler.getKG(); - } else { - kg = taskInfo.data.options.metakg; - } + const metaKGHandler = new handler(undefined); + let metakg = undefined; + // initialize MetaKG only if ops are provided because handler logic is built upon that + if (taskInfo.data.options.metakg_ops !== undefined) + metakg = new MetaKnowledgeGraph(undefined, undefined, taskInfo.data.options.metakg_ops); + const kg = await metaKGHandler.getKG(metakg); // response.logs = utils.filterForLogLevel(response.logs, options.logLevel); return taskResponse(kg); } catch (error) { diff --git a/src/utils/common.ts b/src/utils/common.ts index 9a3c500..3607bce 100644 --- a/src/utils/common.ts +++ b/src/utils/common.ts @@ -2,6 +2,7 @@ import WorkflowError from "./errors/workflow_error"; import { URL } from "url"; import yaml2json from "js-yaml"; import fs from "fs/promises"; +import * as lockfile from 'proper-lockfile'; import path from "path"; import { TrapiLog, TrapiSchema, TrapiWorkflow } from "@biothings-explorer/types"; import { NextFunction, Request, Response } from "express"; @@ -64,3 +65,27 @@ export function filterForLogLevel(logs: TrapiLog[], logLevel: string) { export function methodNotAllowed(_req: Request, res: Response, _next: NextFunction) { res.status(405).send(); } + +export async function writeFileWithLock(filePath: string, data: string) { + let release: (() => Promise) | undefined; + + try { + release = await lockfile.lock(filePath, { + retries: { + retries: 10, // number of retry attempts + factor: 2, // exponential backoff factor + minTimeout: 100, // initial retry delay in milliseconds + maxTimeout: 1000 // maximum retry delay in milliseconds + }, + stale: 5000 // lock expiration in milliseconds to prevent deadlocks + }); + + await fs.writeFile(filePath, data); + } catch (error) { + // console.error("Failed to write file:", error); + } finally { + if (release) { + await release(); + } + } +} \ No newline at end of file