Skip to content

Commit

Permalink
chore(stream): fix event cleanup on streams
Browse files Browse the repository at this point in the history
  • Loading branch information
j-mendez committed Sep 4, 2022
1 parent e518a90 commit 2530fe8
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 56 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@a11ywatch/core",
"version": "0.5.47",
"version": "0.5.48",
"description": "a11ywatch central api",
"main": "./server.js",
"scripts": {
Expand Down
31 changes: 19 additions & 12 deletions src/core/utils/crawl-stream-slim.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ export const crawlHttpStreamSlim = (

return new Promise((resolve) => {
const domain = getHostName(url);
const crawlEvent = `crawl-${domainName(domain)}-${userId || 0}`;

crawlEmitter.on(`crawl-${domainName(domain)}-${userId || 0}`, (source) => {
const crawlListener = (source) => {
const data = source?.data;

// only send when true
Expand All @@ -42,20 +43,26 @@ export const crawlHttpStreamSlim = (
}
res.raw.write(`${JSON.stringify(data)},`);
}
});
};

crawlTrackingEmitter.once(
`crawl-complete-${getKey(domain, undefined, userId)}`,
() => {
if (client && client.includes("a11ywatch_cli/")) {
// send extra item for trailing comma handler
res.raw.write(`${JSON.stringify({ url: "", domain: "" })}`, () => {
resolve(true);
});
} else {
crawlEmitter.on(crawlEvent, crawlListener);

const crawlComplete = () => {
crawlTrackingEmitter.off(crawlEvent, crawlListener);

if (client && client.includes("a11ywatch_cli/")) {
// send extra item for trailing comma handler
res.raw.write(`${JSON.stringify({ url: "", domain: "" })}`, () => {
resolve(true);
}
});
} else {
resolve(true);
}
};

crawlTrackingEmitter.once(
`crawl-complete-${getKey(domain, undefined, userId)}`,
crawlComplete
);
});
};
10 changes: 8 additions & 2 deletions src/core/utils/crawl-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export const crawlHttpStream = (
return new Promise((resolve) => {
const domain = getHostName(url);

crawlEmitter.on(`crawl-${domainName(domain)}-${userId || 0}`, (source) => {
const crawlListener = (source) => {
const data = source?.data;
if (data) {
const issuesFound = data?.issues?.length;
Expand All @@ -51,11 +51,17 @@ export const crawlHttpStream = (
})},`
);
}
});
};

const crawlEvent = `crawl-${domainName(domain)}-${userId || 0}`;

crawlEmitter.on(crawlEvent, crawlListener);

crawlTrackingEmitter.once(
`crawl-complete-${getKey(domain, undefined, userId)}`,
() => {
crawlTrackingEmitter.off(crawlEvent, crawlListener);

// send extra item for trailing comma handler non rpc
if (client && client.includes("a11ywatch_cli/")) {
res.raw.write(
Expand Down
41 changes: 22 additions & 19 deletions src/event/crawl-tracking.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { performance } from "perf_hooks";
import { qWebsiteWorker } from "../queues/crawl";
import { crawlTrackingEmitter } from "./emitters/crawl";
import { domainName } from "../core/utils";
import type { ScanRpcCall } from "../proto/calls/scan-stream";

// handle hostname assign from domain or pages
const extractHostname = (domain?: string, pages?: string[]) => {
Expand Down Expand Up @@ -44,7 +45,7 @@ export const establishCrawlTracking = () => {
});

// track total amount of pages in a website via gRPC.
crawlTrackingEmitter.on("crawl-processing", (call) => {
crawlTrackingEmitter.on("crawl-processing", async (call: ScanRpcCall) => {
const target = call.request;
const key = getKey(target.domain, target.pages, target.user_id); // process a new item tracking count

Expand All @@ -55,8 +56,8 @@ export const establishCrawlTracking = () => {
if (crawlingSet[key].shutdown) {
call.write({ message: "shutdown" });
crawlTrackingEmitter.emit(`crawl-complete-${key}`, target);
qWebsiteWorker
.push({
try {
await qWebsiteWorker.push({
userId: target.user_id,
meta: {
extra: {
Expand All @@ -65,21 +66,19 @@ export const establishCrawlTracking = () => {
shutdown: true,
},
},
})
.catch((err) => console.error(err));
});
} catch (e) {
console.error(e);
}
crawlingSet = removeKey(key, crawlingSet);
} else {
call.write({ message: "" });
}
} else {
call.write({ message: "" });
}

call.end();
});

// track the amount of pages the website should have and determine if complete.
crawlTrackingEmitter.on("crawl-processed", (target) => {
crawlTrackingEmitter.on("crawl-processed", async (target) => {
// process a new item tracking count
const userId = target.user_id;
const key = getKey(target.domain, target.pages, userId);
Expand All @@ -98,8 +97,8 @@ export const establishCrawlTracking = () => {
!crawlingSet[key].crawling
) {
crawlTrackingEmitter.emit(`crawl-complete-${key}`, target);
qWebsiteWorker
.push({
try {
await qWebsiteWorker.push({
userId,
meta: {
extra: {
Expand All @@ -108,15 +107,17 @@ export const establishCrawlTracking = () => {
shutdown: crawlingSet[key].shutdown,
},
},
})
.catch((err) => console.error(err));
});
} catch (e) {
console.error(e);
}
crawlingSet = removeKey(key, crawlingSet); // Crawl completed
}
}
});

// track when the crawler has processed the pages and sent.
crawlTrackingEmitter.on("crawl-complete", (target) => {
crawlTrackingEmitter.on("crawl-complete", async (target) => {
const userId = target.user_id;
const key = getKey(target.domain, target.pages, userId);

Expand All @@ -125,8 +126,8 @@ export const establishCrawlTracking = () => {

if (crawlingSet[key].current === crawlingSet[key].total) {
crawlTrackingEmitter.emit(`crawl-complete-${key}`, target);
qWebsiteWorker
.push({
try {
await qWebsiteWorker.push({
userId,
meta: {
extra: {
Expand All @@ -135,8 +136,10 @@ export const establishCrawlTracking = () => {
shutdown: crawlingSet[key].shutdown,
},
},
})
.catch((err) => console.error(err));
});
} catch (e) {
console.error(e);
}
crawlingSet = removeKey(key, crawlingSet); // remove after completion
}
}
Expand Down
15 changes: 10 additions & 5 deletions src/proto/calls/core-crawl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,27 @@ export const crawlStreaming = (

return new Promise((resolve) => {
const domain = getHostName(url);
const crawlKey = `${domainName(domain)}-${userId || 0}`;
const crawlEvent = `crawl-${crawlKey}`;

crawlEmitter.on(`crawl-${domainName(domain)}-${userId || 0}`, (source) => {
const crawlListener = (source) => {
const data = source?.data;

// only send when true
if (data) {
// trim data for sending miniaml
data.pageLoadTime = null;
data.issues = null;
call.write({ data });
}
});
};

crawlEmitter.on(crawlEvent, crawlListener);

crawlTrackingEmitter.once(
`crawl-complete-${getKey(domain, undefined, userId)}`,
resolve
(data) => {
crawlEmitter.off(crawlEvent, crawlListener);
resolve(data);
}
);
});
};
9 changes: 4 additions & 5 deletions src/proto/calls/scan-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ type ScanParams = {
full: boolean;
};

export type ScanRpcCall = ServerWritableStream<ScanParams, {}>;

// perform scan via streams enqueueing scan
export const scanStream = async (
call: ServerWritableStream<ScanParams, {}>
) => {
export const scanStream = async (call: ScanRpcCall) => {
crawlTrackingEmitter.emit("crawl-processing", call); // pass in call to determine if crawl needs to stop
await crawlEnqueue(call.request); // queue to control output.

call.end();
await crawlEnqueue(call.request); // queue to control output.
};
10 changes: 2 additions & 8 deletions src/queues/crawl/handle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { cpus } from "os";
import { crawlWebsite } from "../../core/actions";
import { setWebsiteScore } from "../../core/utils/stats/score";
import type { Method } from "../../database/config";
import type { ResponseModel } from "../../core/models/response/types";

interface Meta {
method?: Method;
Expand All @@ -17,11 +16,6 @@ type Task = {
meta?: Meta;
};

// the async worker to use for crawling pages
async function asyncWorker(arg: Task): Promise<ResponseModel | boolean> {
return await crawlWebsite(arg);
}

// the async worker to use for completed crawl actions. TODO: remove for collection appending raw value to score.
async function asyncWorkerCrawlComplete(arg: Task): Promise<void> {
const { userId, meta } = arg;
Expand All @@ -42,11 +36,11 @@ if (
) {
cwLimit = Number(process.env.CRAWL_QUEUE_LIMIT);
} else {
cwLimit = Math.max(4 * (cpus().length || 1), 4);
cwLimit = Math.max(10 * (cpus().length || 1), 4);
}

// crawl queue handler
export const q: queueAsPromised<Task> = fastq.promise(asyncWorker, cwLimit);
export const q: queueAsPromised<Task> = fastq.promise(crawlWebsite, cwLimit);

// determine when crawl completed.
export const qWebsiteWorker: queueAsPromised<Task> = fastq.promise(
Expand Down
8 changes: 4 additions & 4 deletions src/web/routes/data/website.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ export const getWebsiteReport = async (
req: FastifyContext["request"],
res: FastifyContext["reply"]
) => {
const q = paramParser(req, "q");
const url = paramParser(req, "url");
const pageUrl = paramParser(req, "pageUrl");
const slug = q || url || pageUrl;
const slug =
paramParser(req, "q") ||
paramParser(req, "url") ||
paramParser(req, "pageUrl");

if (!slug) {
res.status(404);
Expand Down

0 comments on commit 2530fe8

Please sign in to comment.