Skip to content

Commit

Permalink
Fix issue of "too many handles" error when downloading a large blob
Browse files Browse the repository at this point in the history
  • Loading branch information
EmmaZhu committed Jan 15, 2024
1 parent 3f64420 commit 3499794
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 22 deletions.
29 changes: 11 additions & 18 deletions src/common/persistence/FSExtentStore.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {
close,
createReadStream,
createWriteStream,
fdatasync,
mkdir,
Expand Down Expand Up @@ -30,6 +29,7 @@ import IExtentStore, {
} from "./IExtentStore";
import IOperationQueue from "./IOperationQueue";
import OperationQueue from "./OperationQueue";
import FileLazyReadStream from "./FileLazyReadStream";

const statAsync = promisify(stat);
const mkdirAsync = promisify(mkdir);
Expand Down Expand Up @@ -333,26 +333,19 @@ export default class FSExtentStore implements IExtentStore {
const op = () =>
new Promise<NodeJS.ReadableStream>((resolve, reject) => {
this.logger.verbose(
`FSExtentStore:readExtent() Creating read stream. LocationId:${persistencyId} extentId:${
extentChunk.id
} path:${path} offset:${extentChunk.offset} count:${
extentChunk.count
`FSExtentStore:readExtent() Creating read stream. LocationId:${persistencyId} extentId:${extentChunk.id
} path:${path} offset:${extentChunk.offset} count:${extentChunk.count
} end:${extentChunk.offset + extentChunk.count - 1}`,
contextId
);
const stream = createReadStream(path, {
start: extentChunk.offset,
end: extentChunk.offset + extentChunk.count - 1
}).on("close", () => {
this.logger.verbose(
`FSExtentStore:readExtent() Read stream closed. LocationId:${persistencyId} extentId:${
extentChunk.id
} path:${path} offset:${extentChunk.offset} count:${
extentChunk.count
} end:${extentChunk.offset + extentChunk.count - 1}`,
contextId
);
});
const stream = new FileLazyReadStream(
path,
extentChunk.offset,
extentChunk.offset + extentChunk.count - 1,
this.logger,
persistencyId,
extentChunk.id,
contextId);
resolve(stream);
});

Expand Down
72 changes: 72 additions & 0 deletions src/common/persistence/FileLazyReadStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { ReadStream, createReadStream } from "fs";
import { Readable } from "stream";
import ILogger from "../ILogger";


export default class FileLazyReadStream extends Readable {
private extentStream: ReadStream | undefined;
constructor(
private readonly extentPath: string,
private readonly start: number,
private readonly end: number,
private readonly logger: ILogger,
private readonly persistencyId: string,
private readonly extentId: string,
private readonly contextId?: string) {
super();
}

public _read(): void {
if (this.extentStream === undefined) {
this.extentStream = createReadStream(this.extentPath, {
start: this.start,
end: this.end
}).on("close", () => {
this.logger.verbose(
`FSExtentStore:readExtent() Read stream closed. LocationId:${this.persistencyId} extentId:${this.extentId
} path:${this.extentPath} offset:${this.start} end:${this.end}`,
this.contextId
);
});
this.setSourceEventHandlers();
}
this.extentStream?.resume();
}

private setSourceEventHandlers() {
this.extentStream?.on("data", this.sourceDataHandler);
this.extentStream?.on("end", this.sourceErrorOrEndHandler);
this.extentStream?.on("error", this.sourceErrorOrEndHandler);
}

private removeSourceEventHandlers() {
this.extentStream?.removeListener("data", this.sourceDataHandler);
this.extentStream?.removeListener("end", this.sourceErrorOrEndHandler);
this.extentStream?.removeListener("error", this.sourceErrorOrEndHandler);
}

private sourceDataHandler = (data: Buffer) => {
if (!this.push(data)) {
this.extentStream?.pause();
}
}

private sourceErrorOrEndHandler = (err?: Error) => {
if (err && err.name === "AbortError") {
this.destroy(err);
return;
}

this.removeSourceEventHandlers();
this.push(null);
this.destroy(err);
}

_destroy(error: Error | null, callback: (error?: Error) => void): void {
// remove listener from source and release source
//this.removeSourceEventHandlers();
(this.extentStream as Readable).destroy();

callback(error === null ? undefined : error);
}
}
17 changes: 13 additions & 4 deletions tests/testutils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ export async function createRandomLocalFile(

ws.on("open", () => {
// tslint:disable-next-line:no-empty
while (offsetInMB++ < blockNumber && ws.write(randomValueHex())) {}
while (offsetInMB++ < blockNumber && ws.write(randomValueHex())) { }
if (offsetInMB >= blockNumber) {
ws.end();
}
});

ws.on("drain", () => {
// tslint:disable-next-line:no-empty
while (offsetInMB++ < blockNumber && ws.write(randomValueHex())) {}
while (offsetInMB++ < blockNumber && ws.write(randomValueHex())) { }
if (offsetInMB >= blockNumber) {
ws.end();
}
Expand All @@ -168,9 +168,18 @@ export async function readStreamToLocalFile(
file: string
) {
return new Promise<void>((resolve, reject) => {
const ws = createWriteStream(file);
rs.pipe(ws);
const ws = createWriteStream(file, { autoClose: true });
rs.on("data", (data: Buffer) => {
ws.write(data, (err) => {
if (err) { reject(err); }
})
})
//rs.pipe(ws);
rs.on("error", reject);
rs.on("end", () => {
ws.close();
resolve();
});
ws.on("error", reject);
ws.on("finish", resolve);
});
Expand Down

0 comments on commit 3499794

Please sign in to comment.