diff --git a/Dockerfile b/Dockerfile index 1dff27cb..3649f569 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,7 +10,7 @@ RUN mkdir ./doc && cp -R node_modules/@a11ywatch/protos proto RUN protoc --doc_out=./doc --doc_opt=html,index.html proto/*.proto -FROM node:18.7.0-alpine AS installer +FROM node:18.8.0-alpine AS installer WORKDIR /usr/src/app @@ -21,7 +21,7 @@ RUN apk upgrade --update-cache --available && \ COPY . . RUN npm ci -FROM node:18.7.0-alpine AS builder +FROM node:18.8.0-alpine AS builder WORKDIR /usr/src/app @@ -32,7 +32,7 @@ RUN npm run build RUN rm -R ./node_modules RUN npm install --production -FROM node:18.7-alpine +FROM node:18.8.0-alpine WORKDIR /usr/src/app diff --git a/Dockerfile.dev b/Dockerfile.dev index a93e15b9..55706b22 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -10,7 +10,7 @@ RUN mkdir ./doc && cp -R node_modules/@a11ywatch/protos proto RUN protoc --doc_out=./doc --doc_opt=html,index.html proto/*.proto -FROM node:18.7.0-alpine +FROM node:18.8.0-alpine WORKDIR /usr/src/app diff --git a/package-lock.json b/package-lock.json index 347dd00a..7e6f3e80 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@a11ywatch/core", - "version": "0.5.47", + "version": "0.5.54", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@a11ywatch/core", - "version": "0.5.47", + "version": "0.5.54", "dependencies": { "@a11ywatch/protos": "^0.3.3", "@a11ywatch/website-source-builder": "^0.0.32", @@ -14,7 +14,7 @@ "@fastify/cors": "^7.0.0", "@fastify/rate-limit": "^6.0.1", "@graphql-tools/schema": "^8.5.1", - "@grpc/grpc-js": "1.6.10", + "@grpc/grpc-js": "1.6.12", "@grpc/proto-loader": "0.7.2", "apollo-server": "3.10.1", "apollo-server-core": "3.10.1", @@ -1329,9 +1329,9 @@ } }, "node_modules/@grpc/grpc-js": { - "version": "1.6.10", - "resolved": "https://registry.npmjs.org/@grpc/grpc-js/-/grpc-js-1.6.10.tgz", - "integrity": "sha512-XTX5z/P5kH802MDoVm/rqOil0UwYEOEjf9+NPgfmm5UINIxDzwYaXfVR6z8svCBG8Hlbu/FzkXqhP8J5xaWzSQ==", + "version": "1.6.12", + "resolved": "https://registry.npmjs.org/@grpc/grpc-js/-/grpc-js-1.6.12.tgz", + "integrity": "sha512-JmvQ03OTSpVd9JTlj/K3IWHSz4Gk/JMLUTtW7Zb0KvO1LcOYGATh5cNuRYzCAeDR3O8wq+q8FZe97eO9MBrkUw==", "dependencies": { "@grpc/proto-loader": "^0.7.0", "@types/node": ">=12.12.47" @@ -15688,9 +15688,9 @@ } }, "@grpc/grpc-js": { - "version": "1.6.10", - "resolved": "https://registry.npmjs.org/@grpc/grpc-js/-/grpc-js-1.6.10.tgz", - "integrity": "sha512-XTX5z/P5kH802MDoVm/rqOil0UwYEOEjf9+NPgfmm5UINIxDzwYaXfVR6z8svCBG8Hlbu/FzkXqhP8J5xaWzSQ==", + "version": "1.6.12", + "resolved": "https://registry.npmjs.org/@grpc/grpc-js/-/grpc-js-1.6.12.tgz", + "integrity": "sha512-JmvQ03OTSpVd9JTlj/K3IWHSz4Gk/JMLUTtW7Zb0KvO1LcOYGATh5cNuRYzCAeDR3O8wq+q8FZe97eO9MBrkUw==", "requires": { "@grpc/proto-loader": "^0.7.0", "@types/node": ">=12.12.47" diff --git a/package.json b/package.json index 6588cf0d..7870ec3a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@a11ywatch/core", - "version": "0.5.49", + "version": "0.5.54", "description": "a11ywatch central api", "main": "./server.js", "scripts": { @@ -18,7 +18,7 @@ "@fastify/cors": "^7.0.0", "@fastify/rate-limit": "^6.0.1", "@graphql-tools/schema": "^8.5.1", - "@grpc/grpc-js": "1.6.10", + "@grpc/grpc-js": "1.6.12", "@grpc/proto-loader": "0.7.2", "apollo-server": "3.10.1", "apollo-server-core": "3.10.1", diff --git a/src/core/actions/accessibility/crawl.ts b/src/core/actions/accessibility/crawl.ts index 779c1b26..9b9cdfaa 100644 --- a/src/core/actions/accessibility/crawl.ts +++ b/src/core/actions/accessibility/crawl.ts @@ -44,16 +44,17 @@ const trackerProccess = ( { domain, urlMap, userId, shutdown = false }: any, blockEvent?: boolean ) => { - if (!blockEvent && data) { - crawlEmitter.emit(`crawl-${domainName(domain)}-${userId || 0}`, data); - } - - // determine crawl has been processed top level tracking - crawlTrackingEmitter.emit("crawl-processed", { - user_id: userId, - domain, - pages: [urlMap], - shutdown, + process.nextTick(() => { + if (!blockEvent && data) { + crawlEmitter.emit(`crawl-${domainName(domain)}-${userId || 0}`, data); + } + // determine crawl has been processed top level tracking + crawlTrackingEmitter.emit("crawl-processed", { + user_id: userId, + domain, + pages: [urlMap], + shutdown, + }); }); }; @@ -233,96 +234,101 @@ export const crawlPage = async ( // if website record exist update integrity of the data. if (website) { - // if ROOT domain for scan update Website Collection. - if (rootPage) { - const { issuesInfo, ...updateProps } = updateWebsiteProps; - - await collectionUpsert( - updateProps, - [websiteCollection, !!updateWebsiteProps], - { - searchProps: { url: pageUrl, userId }, - } - ); - } + setImmediate(async () => { + // if ROOT domain for scan update Website Collection. + if (rootPage) { + const { issuesInfo, ...updateProps } = updateWebsiteProps; + + await collectionUpsert( + updateProps, + [websiteCollection, !!updateWebsiteProps], + { + searchProps: { url: pageUrl, userId }, + } + ); + } - // if scripts enabled get collection - if (scriptsEnabled) { - [scripts, scriptsCollection] = await ScriptsController().getScript( - { pageUrl, userId, noRetries: true }, - true - ); - - if (script) { - script.userId = userId; - // TODO: look into auto meta reason - if (!scripts?.scriptMeta) { - script.scriptMeta = { - skipContentEnabled: true, - }; + // if scripts enabled get collection + if (scriptsEnabled) { + [scripts, scriptsCollection] = await ScriptsController().getScript( + { pageUrl, userId, noRetries: true }, + true + ); + + if (script) { + script.userId = userId; + // TODO: look into auto meta reason + if (!scripts?.scriptMeta) { + script.scriptMeta = { + skipContentEnabled: true, + }; + } } } - } - - const shouldUpsertCollections = pageConstainsIssues || issueExist; // if issues exist prior or current update collection - // Add to Issues collection if page contains issues or if record should update/delete. - if (shouldUpsertCollections) { - await collectionUpsert(lighthouseData, [pageSpeedCollection, pageSpeed]); // PageInsights - - const { issueMeta, ...analyticsProps } = issuesInfo; - await collectionUpsert( - { - pageUrl, - domain, - userId, - adaScore, - ...analyticsProps, - }, - [analyticsCollection, analytics] - ); // ANALYTICS - - await collectionUpsert( - newIssue, - [issuesCollection, issueExist, !pageConstainsIssues], - { - searchProps: { pageUrl, userId }, - } - ); // ISSUES COLLECTION - } + const shouldUpsertCollections = pageConstainsIssues || issueExist; // if issues exist prior or current update collection + + // Add to Issues collection if page contains issues or if record should update/delete. + if (shouldUpsertCollections) { + await collectionUpsert(lighthouseData, [ + pageSpeedCollection, + pageSpeed, + ]); // PageInsights + + const { issueMeta, ...analyticsProps } = issuesInfo; + await collectionUpsert( + { + pageUrl, + domain, + userId, + adaScore, + ...analyticsProps, + }, + [analyticsCollection, analytics] + ); // ANALYTICS + + await collectionUpsert( + newIssue, + [issuesCollection, issueExist, !pageConstainsIssues], + { + searchProps: { pageUrl, userId }, + } + ); // ISSUES COLLECTION + } - // Pages - if ((!newSite && shouldUpsertCollections) || newSite) { - await collectionUpsert( - updateWebsiteProps, - [pagesCollection, newSite, !pageConstainsIssues], // delete collection if issues do not exist - { - searchProps: { url: pageUrl, userId }, - } - ); - } + // Pages + if ((!newSite && shouldUpsertCollections) || newSite) { + await collectionUpsert( + updateWebsiteProps, + [pagesCollection, newSite, !pageConstainsIssues], // delete collection if issues do not exist + { + searchProps: { url: pageUrl, userId }, + } + ); + } - // Add script to collection - if (scriptsEnabled) { - await collectionUpsert(script, [scriptsCollection, scripts]); - } + // Add script to collection + if (scriptsEnabled) { + await collectionUpsert(script, [scriptsCollection, scripts]); + } + }); } // Flatten issues with the array set results without meta. const responseData = { - data: Object.assign({}, updateWebsiteProps, { - issues: subIssues, - }), + data: updateWebsiteProps, }; + responseData.data.issues = subIssues; if (pageConstainsIssues) { - if (sendSub) { - try { - await pubsub.publish(ISSUE_ADDED, { issueAdded: newIssue }); - } catch (_) { - // silent pub sub errors - } - } + sendSub && + setImmediate(async () => { + try { + await pubsub.publish(ISSUE_ADDED, { issueAdded: newIssue }); + } catch (_) { + // silent pub sub errors + } + }); // send email if issues of type error exist for the page. TODO: remove from layer. if (sendEmail && issuesInfo?.errorCount) { diff --git a/src/core/controllers/page-actions/find.ts b/src/core/controllers/page-actions/find.ts index d2cb3173..1616448f 100644 --- a/src/core/controllers/page-actions/find.ts +++ b/src/core/controllers/page-actions/find.ts @@ -2,10 +2,11 @@ import { connect } from "../../../database"; // find page actions by path export const findPageActionsByPath = async ({ path, userId }) => { - let actions; - try { - const [actionsCollection] = await connect("PageActions"); + const [actionsCollection] = await connect("PageActions"); + + let actions = []; + try { const action = await actionsCollection.findOne({ path, userId, diff --git a/src/core/utils/crawl-stream-slim.ts b/src/core/utils/crawl-stream-slim.ts index 57eb45d6..f3700cf2 100644 --- a/src/core/utils/crawl-stream-slim.ts +++ b/src/core/utils/crawl-stream-slim.ts @@ -32,32 +32,36 @@ export const crawlHttpStreamSlim = ( const crawlEvent = `crawl-${domainName(domain)}-${userId || 0}`; const crawlListener = (source) => { - const data = source?.data; + setImmediate(() => { + const data = source?.data; - // only send when true - if (data) { - // trim data for sending minimally - if (onlyData) { - data.pageLoadTime = null; - data.issues = null; + // only send when true + if (data) { + // trim data for sending minimally + if (onlyData) { + data.pageLoadTime = null; + data.issues = null; + } + res.raw.write(`${JSON.stringify(data)},`); } - res.raw.write(`${JSON.stringify(data)},`); - } + }); }; crawlEmitter.on(crawlEvent, crawlListener); const crawlComplete = () => { - crawlTrackingEmitter.off(crawlEvent, crawlListener); + setImmediate(() => { + crawlTrackingEmitter.off(crawlEvent, crawlListener); - if (client && client.includes("a11ywatch_cli/")) { - // send extra item for trailing comma handler - res.raw.write(`${JSON.stringify({ url: "", domain: "" })}`, () => { + if (client && client.includes("a11ywatch_cli/")) { + // send extra item for trailing comma handler + res.raw.write(`${JSON.stringify({ url: "", domain: "" })}`, () => { + resolve(true); + }); + } else { resolve(true); - }); - } else { - resolve(true); - } + } + }); }; crawlTrackingEmitter.once( diff --git a/src/core/utils/crawl-stream.ts b/src/core/utils/crawl-stream.ts index 6a5304d1..65cbf7a7 100644 --- a/src/core/utils/crawl-stream.ts +++ b/src/core/utils/crawl-stream.ts @@ -36,30 +36,27 @@ export const crawlHttpStream = ( const domain = getHostName(url); const crawlListener = (source) => { - const data = source?.data; - if (data) { - const issuesFound = data?.issues?.length; + setImmediate(() => { + const data = source?.data; + if (data) { + const issuesFound = data?.issues?.length; - res.raw.write( - `${JSON.stringify({ - data, - message: `${data?.url} has ${issuesFound} issue${ - issuesFound === 1 ? "" : "s" - }`, - success: true, - code: 200, - })},` - ); - } + res.raw.write( + `${JSON.stringify({ + data, + message: `${data?.url} has ${issuesFound} issue${ + issuesFound === 1 ? "" : "s" + }`, + success: true, + code: 200, + })},` + ); + } + }); }; - const crawlEvent = `crawl-${domainName(domain)}-${userId || 0}`; - - crawlEmitter.on(crawlEvent, crawlListener); - - crawlTrackingEmitter.once( - `crawl-complete-${getKey(domain, undefined, userId)}`, - () => { + const crawlCompleteListener = () => { + setImmediate(() => { crawlTrackingEmitter.off(crawlEvent, crawlListener); // send extra item for trailing comma handler non rpc @@ -78,7 +75,16 @@ export const crawlHttpStream = ( } else { resolve(true); } - } + }); + }; + + const crawlEvent = `crawl-${domainName(domain)}-${userId || 0}`; + + crawlEmitter.on(crawlEvent, crawlListener); + + crawlTrackingEmitter.once( + `crawl-complete-${getKey(domain, undefined, userId)}`, + crawlCompleteListener ); }); }; diff --git a/src/event/crawl-tracking.ts b/src/event/crawl-tracking.ts index 7be38ae6..9aab2bc4 100644 --- a/src/event/crawl-tracking.ts +++ b/src/event/crawl-tracking.ts @@ -1,5 +1,6 @@ import { getHostName } from "@a11ywatch/website-source-builder"; import { performance } from "perf_hooks"; +import { bindTaskQ } from "../queues/crawl/handle"; import { qWebsiteWorker } from "../queues/crawl"; import { crawlTrackingEmitter } from "./emitters/crawl"; import { domainName } from "../core/utils"; @@ -8,44 +9,67 @@ import type { ScanRpcCall } from "../proto/calls/scan-stream"; // handle hostname assign from domain or pages const extractHostname = (domain?: string, pages?: string[]) => { const target = pages && pages.length === 1 ? pages[0] : domain; - return domainName(getHostName(target)); }; // get a key for the event based on domain and uid. -export const getKey = (domain, pages, user_id) => { - return `${extractHostname(domain, pages)}-${user_id || 0}`; -}; +export const getKey = (domain, pages, user_id) => + `${extractHostname(domain, pages)}-${user_id || 0}`; // remove key from object export const removeKey = (key, { [key]: _, ...rest }) => rest; -/* Emit events to track crawling progress. - * This mainly tracks at a higher level the progress between the gRPC crawling across modules. - * TODO: allow configuring a url and passing in optional Promise handling. - * @param url: scope the events to track one domain - */ -export const establishCrawlTracking = () => { - // track when a new website starts and determine page completion - let crawlingSet = {}; +// track when a new website starts and determine page completion +let crawlingSet = {}; - crawlTrackingEmitter.on("crawl-start", (target) => { +// init crawling +const crawlStart = (target) => { + setImmediate(() => { const key = getKey(target.domain, target.pages, target.user_id); - // set the item for tracking if (!crawlingSet[key]) { crawlingSet[key] = { total: 0, current: 0, crawling: true, - duration: performance.now(), shutdown: false, + duration: performance.now(), + event: bindTaskQ(Math.max(Object.keys(crawlingSet).length, 1)), }; } }); +}; - // track total amount of pages in a website via gRPC. - crawlTrackingEmitter.on("crawl-processing", async (call: ScanRpcCall) => { +// de-init crawling +const crawlComplete = (target) => { + setImmediate(async () => { + const userId = target.user_id; + const key = getKey(target.domain, target.pages, userId); + + if (crawlingSet[key]) { + crawlingSet[key].crawling = false; + + if (crawlingSet[key].current === crawlingSet[key].total) { + crawlTrackingEmitter.emit(`crawl-complete-${key}`, target); + await qWebsiteWorker.push({ + userId, + meta: { + extra: { + domain: extractHostname(target.domain), + duration: performance.now() - crawlingSet[key].duration, + shutdown: crawlingSet[key].shutdown, + }, + }, + }); + crawlingSet = removeKey(key, crawlingSet); // remove after completion + } + } + }); +}; + +// gRPC call +const crawlProcessing = (call: ScanRpcCall) => { + setImmediate(async () => { const target = call.request; const key = getKey(target.domain, target.pages, target.user_id); // process a new item tracking count @@ -56,29 +80,27 @@ export const establishCrawlTracking = () => { if (crawlingSet[key].shutdown) { call.write({ message: "shutdown" }); crawlTrackingEmitter.emit(`crawl-complete-${key}`, target); - try { - await qWebsiteWorker.push({ - userId: target.user_id, - meta: { - extra: { - domain: extractHostname(target.domain), - duration: performance.now() - crawlingSet[key].duration, - shutdown: true, - }, + await qWebsiteWorker.push({ + userId: target.user_id, + meta: { + extra: { + domain: extractHostname(target.domain), + duration: performance.now() - crawlingSet[key].duration, + shutdown: true, }, - }); - } catch (e) { - console.error(e); - } + }, + }); crawlingSet = removeKey(key, crawlingSet); } } call.end(); }); +}; - // track the amount of pages the website should have and determine if complete. - crawlTrackingEmitter.on("crawl-processed", async (target) => { +// crawl finished processing the page +const crawlProcessed = (target) => { + setImmediate(async () => { // process a new item tracking count const userId = target.user_id; const key = getKey(target.domain, target.pages, userId); @@ -97,51 +119,36 @@ export const establishCrawlTracking = () => { !crawlingSet[key].crawling ) { crawlTrackingEmitter.emit(`crawl-complete-${key}`, target); - try { - await qWebsiteWorker.push({ - userId, - meta: { - extra: { - domain: extractHostname(target.domain), - duration: performance.now() - crawlingSet[key].duration, - shutdown: crawlingSet[key].shutdown, - }, + await qWebsiteWorker.push({ + userId, + meta: { + extra: { + domain: extractHostname(target.domain), + duration: performance.now() - crawlingSet[key].duration, + shutdown: crawlingSet[key].shutdown, }, - }); - } catch (e) { - console.error(e); - } + }, + }); crawlingSet = removeKey(key, crawlingSet); // Crawl completed } } }); +}; +/* Emit events to track crawling progress. + * This mainly tracks at a higher level the progress between the gRPC crawling across modules. + * TODO: allow configuring a url and passing in optional Promise handling. + * @param url: scope the events to track one domain + */ +export const establishCrawlTracking = () => { + // track when crawl has started + crawlTrackingEmitter.on("crawl-start", crawlStart); // track when the crawler has processed the pages and sent. - crawlTrackingEmitter.on("crawl-complete", async (target) => { - const userId = target.user_id; - const key = getKey(target.domain, target.pages, userId); - - if (crawlingSet[key]) { - crawlingSet[key].crawling = false; - - if (crawlingSet[key].current === crawlingSet[key].total) { - crawlTrackingEmitter.emit(`crawl-complete-${key}`, target); - try { - await qWebsiteWorker.push({ - userId, - meta: { - extra: { - domain: extractHostname(target.domain), - duration: performance.now() - crawlingSet[key].duration, - shutdown: crawlingSet[key].shutdown, - }, - }, - }); - } catch (e) { - console.error(e); - } - crawlingSet = removeKey(key, crawlingSet); // remove after completion - } - } - }); + crawlTrackingEmitter.on("crawl-complete", crawlComplete); + // track total amount of pages in a website via gRPC. + crawlTrackingEmitter.on("crawl-processing", crawlProcessing); + // track the amount of pages the website should have and determine if complete. + crawlTrackingEmitter.on("crawl-processed", crawlProcessed); }; + +export { crawlingSet }; diff --git a/src/proto/calls/core-crawl.ts b/src/proto/calls/core-crawl.ts index 3f499ec7..548a2072 100644 --- a/src/proto/calls/core-crawl.ts +++ b/src/proto/calls/core-crawl.ts @@ -3,7 +3,6 @@ import { incrementApiByUser } from "../../core/controllers/users/find/get-api"; import { getCrawlConfig } from "../../core/streams/crawl-config"; import { watcherCrawl } from "../../core/actions/accessibility/watcher_crawl"; import { crawlEmitter, crawlTrackingEmitter } from "../../event"; -import { getKey } from "../../event/crawl-tracking"; import { domainName } from "../../core/utils/domain-name"; import { getHostName } from "../../core/utils/get-host"; import type { CrawlProps } from "../../core/utils/crawl-stream"; @@ -13,7 +12,7 @@ type ServerCallStreaming = ServerWritableStream< {} >; -// core multi page streaming gRPC scanning +// core multi page streaming gRPC export const coreCrawl = async (call: ServerCallStreaming) => { const { authorization, url, subdomains, tld } = call.request; const userNext = await incrementApiByUser(authorization); @@ -51,29 +50,35 @@ export const crawlStreaming = ( }); }); - return new Promise((resolve) => { - const domain = getHostName(url); - const crawlKey = `${domainName(domain)}-${userId || 0}`; - const crawlEvent = `crawl-${crawlKey}`; + const domain = getHostName(url); + const crawlKey = `${domainName(domain)}-${userId || 0}`; + const crawlEvent = `crawl-${crawlKey}`; + return new Promise((resolve) => { const crawlListener = (source) => { - const data = source?.data; + setImmediate(() => { + const data = source?.data; + + if (data) { + data.pageLoadTime = null; + data.issues = null; + call.write({ data }); + } + }); + }; - if (data) { - data.pageLoadTime = null; - data.issues = null; - call.write({ data }); - } + const crawlCompleteListener = (data) => { + setImmediate(() => { + crawlEmitter.off(crawlEvent, crawlListener); + resolve(data); + }); }; crawlEmitter.on(crawlEvent, crawlListener); crawlTrackingEmitter.once( - `crawl-complete-${getKey(domain, undefined, userId)}`, - (data) => { - crawlEmitter.off(crawlEvent, crawlListener); - resolve(data); - } + `crawl-complete-${crawlKey}`, + crawlCompleteListener ); }); }; diff --git a/src/proto/calls/scan-end.ts b/src/proto/calls/scan-end.ts index e9fb95e7..56a7b266 100644 --- a/src/proto/calls/scan-end.ts +++ b/src/proto/calls/scan-end.ts @@ -7,8 +7,11 @@ export const scanEnd = async ( call: ServerWritableStream<{ domain: string; user_id: number }, {}>, callback: sendUnaryData ) => { + process.nextTick(() => { + crawlTrackingEmitter.emit("crawl-complete", call.request); + }); + await crawlTrackerComplete(call.request); // TODO: remove - fully handled via events - crawlTrackingEmitter.emit("crawl-complete", call.request); callback(null, {}); }; diff --git a/src/proto/calls/scan-start.ts b/src/proto/calls/scan-start.ts index ac6d984c..70580ea0 100644 --- a/src/proto/calls/scan-start.ts +++ b/src/proto/calls/scan-start.ts @@ -6,7 +6,9 @@ export const scanStart = async ( call: ServerWritableStream<{ domain: string; user_id: number }, {}>, callback: sendUnaryData ) => { - crawlTrackingEmitter.emit("crawl-start", call.request); + process.nextTick(() => { + crawlTrackingEmitter.emit("crawl-start", call.request); + }); callback(null, {}); }; diff --git a/src/proto/calls/scan-stream.ts b/src/proto/calls/scan-stream.ts index dbc21bfa..06146290 100644 --- a/src/proto/calls/scan-stream.ts +++ b/src/proto/calls/scan-stream.ts @@ -1,6 +1,6 @@ import type { ServerWritableStream } from "@grpc/grpc-js"; +import { crawlEnqueue } from "../../queues/crawl/crawl"; import { crawlTrackingEmitter } from "../../event"; -import { crawlEnqueue } from "../../queues/crawl"; type ScanParams = { pages: string[]; @@ -13,7 +13,10 @@ export type ScanRpcCall = ServerWritableStream; // perform scan via streams enqueueing scan export const scanStream = async (call: ScanRpcCall) => { - crawlTrackingEmitter.emit("crawl-processing", call); // pass in call to determine if crawl needs to stop + process.nextTick(() => { + crawlTrackingEmitter.emit("crawl-processing", call); // pass in call to determine if crawl needs to stop + }); + // TODO: remove queue for improved browser page handling puppeteer await crawlEnqueue(call.request); // queue to control output. }; diff --git a/src/queues/crawl/crawl.ts b/src/queues/crawl/crawl.ts index c1d75df7..26a6b7a2 100644 --- a/src/queues/crawl/crawl.ts +++ b/src/queues/crawl/crawl.ts @@ -1,3 +1,4 @@ +import { crawlingSet, getKey } from "../../event/crawl-tracking"; import { q } from "./handle"; /* @@ -9,16 +10,21 @@ export const crawlEnqueue = async (data: { user_id: number; }) => { const { pages = [], user_id } = data; + const key = getKey(null, pages, user_id); + const event = crawlingSet[key] && crawlingSet[key].event; // get users for crawl job matching the urls for (const url of pages) { - await q - .push({ + if (event) { + await event.unshift({ url, userId: user_id, - }) - .catch((e) => { - console.error(e); }); + } else { + await q.unshift({ + url, + userId: user_id, + }); + } } }; diff --git a/src/queues/crawl/handle.ts b/src/queues/crawl/handle.ts index 667859bb..662ab243 100644 --- a/src/queues/crawl/handle.ts +++ b/src/queues/crawl/handle.ts @@ -42,12 +42,20 @@ if ( ) { cwLimit = Number(process.env.CRAWL_QUEUE_LIMIT); } else { - cwLimit = Math.max(8 * (cpus().length || 1), 4); + cwLimit = Math.max(5 * (cpus().length || 1), 4); } // crawl queue handler export const q: queueAsPromised = fastq.promise(asyncWorker, cwLimit); +// current worker limit +export const getCWLimit = (limit = 8) => + Math.max(Math.floor(cwLimit / limit), 1); + +// bind the fastq to a function +export const bindTaskQ = (limit = 8): queueAsPromised => + fastq.promise(asyncWorker.bind({}), getCWLimit(limit)); + // determine when crawl completed. export const qWebsiteWorker: queueAsPromised = fastq.promise( asyncWorkerCrawlComplete, diff --git a/src/queues/crawl/index.ts b/src/queues/crawl/index.ts index 2555f607..29c5c582 100644 --- a/src/queues/crawl/index.ts +++ b/src/queues/crawl/index.ts @@ -1,2 +1 @@ -export { crawlEnqueue } from "./crawl"; export { q, qWebsiteWorker } from "./handle";