diff --git a/package.json b/package.json index 6fdb6853..b77a1aff 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@a11ywatch/core", - "version": "0.5.47", + "version": "0.5.48", "description": "a11ywatch central api", "main": "./server.js", "scripts": { diff --git a/src/core/utils/crawl-stream-slim.ts b/src/core/utils/crawl-stream-slim.ts index bd07edbf..57eb45d6 100644 --- a/src/core/utils/crawl-stream-slim.ts +++ b/src/core/utils/crawl-stream-slim.ts @@ -29,8 +29,9 @@ export const crawlHttpStreamSlim = ( return new Promise((resolve) => { const domain = getHostName(url); + const crawlEvent = `crawl-${domainName(domain)}-${userId || 0}`; - crawlEmitter.on(`crawl-${domainName(domain)}-${userId || 0}`, (source) => { + const crawlListener = (source) => { const data = source?.data; // only send when true @@ -42,20 +43,26 @@ export const crawlHttpStreamSlim = ( } res.raw.write(`${JSON.stringify(data)},`); } - }); + }; - crawlTrackingEmitter.once( - `crawl-complete-${getKey(domain, undefined, userId)}`, - () => { - if (client && client.includes("a11ywatch_cli/")) { - // send extra item for trailing comma handler - res.raw.write(`${JSON.stringify({ url: "", domain: "" })}`, () => { - resolve(true); - }); - } else { + crawlEmitter.on(crawlEvent, crawlListener); + + const crawlComplete = () => { + crawlTrackingEmitter.off(crawlEvent, crawlListener); + + if (client && client.includes("a11ywatch_cli/")) { + // send extra item for trailing comma handler + res.raw.write(`${JSON.stringify({ url: "", domain: "" })}`, () => { resolve(true); - } + }); + } else { + resolve(true); } + }; + + crawlTrackingEmitter.once( + `crawl-complete-${getKey(domain, undefined, userId)}`, + crawlComplete ); }); }; diff --git a/src/core/utils/crawl-stream.ts b/src/core/utils/crawl-stream.ts index 24d0f9bc..6a5304d1 100644 --- a/src/core/utils/crawl-stream.ts +++ b/src/core/utils/crawl-stream.ts @@ -35,7 +35,7 @@ export const crawlHttpStream = ( return new Promise((resolve) => { const domain = getHostName(url); - crawlEmitter.on(`crawl-${domainName(domain)}-${userId || 0}`, (source) => { + const crawlListener = (source) => { const data = source?.data; if (data) { const issuesFound = data?.issues?.length; @@ -51,11 +51,17 @@ export const crawlHttpStream = ( })},` ); } - }); + }; + + const crawlEvent = `crawl-${domainName(domain)}-${userId || 0}`; + + crawlEmitter.on(crawlEvent, crawlListener); crawlTrackingEmitter.once( `crawl-complete-${getKey(domain, undefined, userId)}`, () => { + crawlTrackingEmitter.off(crawlEvent, crawlListener); + // send extra item for trailing comma handler non rpc if (client && client.includes("a11ywatch_cli/")) { res.raw.write( diff --git a/src/event/crawl-tracking.ts b/src/event/crawl-tracking.ts index 6c46a03d..7be38ae6 100644 --- a/src/event/crawl-tracking.ts +++ b/src/event/crawl-tracking.ts @@ -3,6 +3,7 @@ import { performance } from "perf_hooks"; import { qWebsiteWorker } from "../queues/crawl"; import { crawlTrackingEmitter } from "./emitters/crawl"; import { domainName } from "../core/utils"; +import type { ScanRpcCall } from "../proto/calls/scan-stream"; // handle hostname assign from domain or pages const extractHostname = (domain?: string, pages?: string[]) => { @@ -44,7 +45,7 @@ export const establishCrawlTracking = () => { }); // track total amount of pages in a website via gRPC. - crawlTrackingEmitter.on("crawl-processing", (call) => { + crawlTrackingEmitter.on("crawl-processing", async (call: ScanRpcCall) => { const target = call.request; const key = getKey(target.domain, target.pages, target.user_id); // process a new item tracking count @@ -55,8 +56,8 @@ export const establishCrawlTracking = () => { if (crawlingSet[key].shutdown) { call.write({ message: "shutdown" }); crawlTrackingEmitter.emit(`crawl-complete-${key}`, target); - qWebsiteWorker - .push({ + try { + await qWebsiteWorker.push({ userId: target.user_id, meta: { extra: { @@ -65,21 +66,19 @@ export const establishCrawlTracking = () => { shutdown: true, }, }, - }) - .catch((err) => console.error(err)); + }); + } catch (e) { + console.error(e); + } crawlingSet = removeKey(key, crawlingSet); - } else { - call.write({ message: "" }); } - } else { - call.write({ message: "" }); } call.end(); }); // track the amount of pages the website should have and determine if complete. - crawlTrackingEmitter.on("crawl-processed", (target) => { + crawlTrackingEmitter.on("crawl-processed", async (target) => { // process a new item tracking count const userId = target.user_id; const key = getKey(target.domain, target.pages, userId); @@ -98,8 +97,8 @@ export const establishCrawlTracking = () => { !crawlingSet[key].crawling ) { crawlTrackingEmitter.emit(`crawl-complete-${key}`, target); - qWebsiteWorker - .push({ + try { + await qWebsiteWorker.push({ userId, meta: { extra: { @@ -108,15 +107,17 @@ export const establishCrawlTracking = () => { shutdown: crawlingSet[key].shutdown, }, }, - }) - .catch((err) => console.error(err)); + }); + } catch (e) { + console.error(e); + } crawlingSet = removeKey(key, crawlingSet); // Crawl completed } } }); // track when the crawler has processed the pages and sent. - crawlTrackingEmitter.on("crawl-complete", (target) => { + crawlTrackingEmitter.on("crawl-complete", async (target) => { const userId = target.user_id; const key = getKey(target.domain, target.pages, userId); @@ -125,8 +126,8 @@ export const establishCrawlTracking = () => { if (crawlingSet[key].current === crawlingSet[key].total) { crawlTrackingEmitter.emit(`crawl-complete-${key}`, target); - qWebsiteWorker - .push({ + try { + await qWebsiteWorker.push({ userId, meta: { extra: { @@ -135,8 +136,10 @@ export const establishCrawlTracking = () => { shutdown: crawlingSet[key].shutdown, }, }, - }) - .catch((err) => console.error(err)); + }); + } catch (e) { + console.error(e); + } crawlingSet = removeKey(key, crawlingSet); // remove after completion } } diff --git a/src/proto/calls/core-crawl.ts b/src/proto/calls/core-crawl.ts index 486d75e3..3f499ec7 100644 --- a/src/proto/calls/core-crawl.ts +++ b/src/proto/calls/core-crawl.ts @@ -53,22 +53,27 @@ export const crawlStreaming = ( return new Promise((resolve) => { const domain = getHostName(url); + const crawlKey = `${domainName(domain)}-${userId || 0}`; + const crawlEvent = `crawl-${crawlKey}`; - crawlEmitter.on(`crawl-${domainName(domain)}-${userId || 0}`, (source) => { + const crawlListener = (source) => { const data = source?.data; - // only send when true if (data) { - // trim data for sending miniaml data.pageLoadTime = null; data.issues = null; call.write({ data }); } - }); + }; + + crawlEmitter.on(crawlEvent, crawlListener); crawlTrackingEmitter.once( `crawl-complete-${getKey(domain, undefined, userId)}`, - resolve + (data) => { + crawlEmitter.off(crawlEvent, crawlListener); + resolve(data); + } ); }); }; diff --git a/src/proto/calls/scan-stream.ts b/src/proto/calls/scan-stream.ts index 21eb77fd..dbc21bfa 100644 --- a/src/proto/calls/scan-stream.ts +++ b/src/proto/calls/scan-stream.ts @@ -9,12 +9,11 @@ type ScanParams = { full: boolean; }; +export type ScanRpcCall = ServerWritableStream; + // perform scan via streams enqueueing scan -export const scanStream = async ( - call: ServerWritableStream -) => { +export const scanStream = async (call: ScanRpcCall) => { crawlTrackingEmitter.emit("crawl-processing", call); // pass in call to determine if crawl needs to stop - await crawlEnqueue(call.request); // queue to control output. - call.end(); + await crawlEnqueue(call.request); // queue to control output. }; diff --git a/src/queues/crawl/handle.ts b/src/queues/crawl/handle.ts index f85583b9..07464b08 100644 --- a/src/queues/crawl/handle.ts +++ b/src/queues/crawl/handle.ts @@ -4,7 +4,6 @@ import { cpus } from "os"; import { crawlWebsite } from "../../core/actions"; import { setWebsiteScore } from "../../core/utils/stats/score"; import type { Method } from "../../database/config"; -import type { ResponseModel } from "../../core/models/response/types"; interface Meta { method?: Method; @@ -17,11 +16,6 @@ type Task = { meta?: Meta; }; -// the async worker to use for crawling pages -async function asyncWorker(arg: Task): Promise { - return await crawlWebsite(arg); -} - // the async worker to use for completed crawl actions. TODO: remove for collection appending raw value to score. async function asyncWorkerCrawlComplete(arg: Task): Promise { const { userId, meta } = arg; @@ -42,11 +36,11 @@ if ( ) { cwLimit = Number(process.env.CRAWL_QUEUE_LIMIT); } else { - cwLimit = Math.max(4 * (cpus().length || 1), 4); + cwLimit = Math.max(10 * (cpus().length || 1), 4); } // crawl queue handler -export const q: queueAsPromised = fastq.promise(asyncWorker, cwLimit); +export const q: queueAsPromised = fastq.promise(crawlWebsite, cwLimit); // determine when crawl completed. export const qWebsiteWorker: queueAsPromised = fastq.promise( diff --git a/src/web/routes/data/website.ts b/src/web/routes/data/website.ts index b128b9c8..b8e2ae7c 100644 --- a/src/web/routes/data/website.ts +++ b/src/web/routes/data/website.ts @@ -62,10 +62,10 @@ export const getWebsiteReport = async ( req: FastifyContext["request"], res: FastifyContext["reply"] ) => { - const q = paramParser(req, "q"); - const url = paramParser(req, "url"); - const pageUrl = paramParser(req, "pageUrl"); - const slug = q || url || pageUrl; + const slug = + paramParser(req, "q") || + paramParser(req, "url") || + paramParser(req, "pageUrl"); if (!slug) { res.status(404);