Skip to content

Commit

Permalink
feat: stateful multi-connection download
Browse files Browse the repository at this point in the history
  • Loading branch information
ido-pluto committed Mar 12, 2024
1 parent 8b3d0c8 commit 346c5a5
Show file tree
Hide file tree
Showing 27 changed files with 406 additions and 234 deletions.
Binary file modified assets/pull-file.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
100 changes: 91 additions & 9 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@
"xmlhttprequest-ssl": "^2.1.1"
},
"dependencies": {
"@supercharge/promise-pool": "^3.1.1",
"async-retry": "^1.3.3",
"axios": "^1.6.7",
"chalk": "^5.3.0",
"cli-spinners": "^2.9.2",
"commander": "^10.0.0",
Expand Down
3 changes: 3 additions & 0 deletions src/download/browser-download.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import DownloadEngineBrowser, {DownloadEngineOptionsBrowser} from "./download-engine/engine/download-engine-browser.js";
import DownloadEngineMultiDownload from "./download-engine/engine/download-engine-multi-download.js";

export const DEFAULT_PARALLEL_STREAMS_FOR_BROWSER = 3;

export type DownloadFileBrowserOptions = DownloadEngineOptionsBrowser;

/**
* Download one file in the browser environment.
*/
export async function downloadFileBrowser(options: DownloadFileBrowserOptions) {
options.parallelStreams ??= DEFAULT_PARALLEL_STREAMS_FOR_BROWSER;
return await DownloadEngineBrowser.createFromOptions(options);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import {PromisePool, Stoppable} from "@supercharge/promise-pool";
import ProgressStatusFile, {ProgressStatus} from "./progress-status-file.js";
import {ChunkStatus, DownloadFile, SaveProgressInfo} from "./types.js";
import BaseDownloadEngineFetchStream from "./streams/download-engine-fetch-stream/base-download-engine-fetch-stream.js";
import BaseDownloadEngineWriteStream from "./streams/download-engine-write-stream/base-download-engine-write-stream.js";
import {ChunkStatus, DownloadFile, SaveProgressInfo} from "../types.js";
import BaseDownloadEngineFetchStream from "../streams/download-engine-fetch-stream/base-download-engine-fetch-stream.js";
import BaseDownloadEngineWriteStream from "../streams/download-engine-write-stream/base-download-engine-write-stream.js";
import retry from "async-retry";
import {EventEmitter} from "eventemitter3";
import {withLock} from "lifecycle-utils";
import DownloadProgram from "./download-program.js";

export type DownloadEngineFileOptions = {
chunkSize?: number;
Expand Down Expand Up @@ -47,12 +47,12 @@ export default class DownloadEngineFile extends EventEmitter<DownloadEngineFileE
protected _progress: SaveProgressInfo = {
part: 0,
chunks: [],
chunkSize: 0
chunkSize: 0,
parallelStreams: 0
};

protected _closed = false;
protected _progressStatus: ProgressStatusFile;
protected _activePool?: Stoppable;
protected _activeStreamBytes: { [key: number]: number } = {};

public constructor(file: DownloadFile, options: DownloadEngineFileOptions) {
Expand Down Expand Up @@ -99,7 +99,8 @@ export default class DownloadEngineFile extends EventEmitter<DownloadEngineFileE
this._progress = {
part: 0,
chunks: this._emptyChunksForPart(0),
chunkSize: this.options.chunkSize
chunkSize: this.options.chunkSize,
parallelStreams: this.options.parallelStreams
};
}
}
Expand All @@ -111,79 +112,58 @@ export default class DownloadEngineFile extends EventEmitter<DownloadEngineFileE
if (i > this._progress.part || !this._activePart.acceptRange) {
this._progress.part = i;
this._progress.chunkSize = this.options.chunkSize;
this._progress.parallelStreams = this.options.parallelStreams;
this._progress.chunks = this._emptyChunksForPart(i);
}

this._activeStreamBytes = {};
if (this._activePart.acceptRange && this.options.parallelStreams > 1) {
await this._downloadPartParallelStream();
} else {
await this._downloadWithoutParallelStreams();
if (!this._activePart.acceptRange) {
this._progress.parallelStreams = 1;
}

this._activeStreamBytes = {};
const downloadProgram = new DownloadProgram(this._progress, this._downloadSlice.bind(this));
await downloadProgram.download();
}
await this._saveProgress();
this._progressStatus.finished();
await this._saveProgress();
this.emit("finished");
await this.options.onFinishAsync?.();
}

protected async _downloadWithoutParallelStreams() {
const startIndex = this._progress.chunks.findIndex(status => status !== ChunkStatus.COMPLETE);
if (startIndex === -1) return;
const startByteDownloaded = startIndex * this._progress.chunkSize;

protected async _downloadSlice(startChunk: number, endChunk: number) {
const fetchState = this.options.fetchStream.withSubState({
chunkSize: this._progress.chunkSize,
start: startByteDownloaded,
end: this.downloadSize,
startChunk,
endChunk,
totalSize: this.downloadSize,
url: this._activePart.downloadURL!,
rangeSupport: this._activePart.acceptRange,
onProgress: (length: number) => {
this._activeStreamBytes[0] = length;
this._activeStreamBytes[startChunk] = length;
this._sendProgressDownloadPart();
}
});

await fetchState.fetchChunks((chunk, index) => {
this._progress.chunks[startChunk] = ChunkStatus.IN_PROGRESS;
await fetchState.fetchChunks((chunks, writePosition, index) => {
if (this._closed) return;

const byteDownloaded = startByteDownloaded + index * this._progress.chunkSize;
this.options.writeStream.write(byteDownloaded, chunk);
this._progress.chunks[startIndex + index] = ChunkStatus.COMPLETE;
this._activeStreamBytes[0] = 0;
this._saveProgress();
});
}
for (const chunk of chunks) {
this.options.writeStream.write(writePosition, chunk);
writePosition += chunk.length;
}

protected async _downloadPartParallelStream() {
try {
await PromisePool.withConcurrency(this.options.parallelStreams)
.for(this._progress.chunks)
.process(async (status, index, pool) => {
await this.options.fetchStream.paused;
this._activePool = pool;
if (status !== ChunkStatus.NOT_STARTED) {
return;
}
this._activeStreamBytes[index] = 0;
this._progress.chunks[index] = ChunkStatus.IN_PROGRESS;

const start = index * this._progress.chunkSize;
const end = Math.min(start + this._progress.chunkSize, this._activePart.size);
const buffer = await this.options.fetchStream.fetchBytes(this._activePart.downloadURL!, start, end, (length: number) => {
this._activeStreamBytes[index] = length;
this._sendProgressDownloadPart();
});

await this.options.writeStream.write(start, buffer);
this._progress.chunks[index] = ChunkStatus.COMPLETE;
delete this._activeStreamBytes[index];

this._saveProgress();
});
} finally {
this._activePool = undefined;
}
this._progress.chunks[index] = ChunkStatus.COMPLETE;
delete this._activeStreamBytes[startChunk];
void this._saveProgress();

if (this._progress.chunks[index + 1] != ChunkStatus.NOT_STARTED) {
return fetchState.close();
}

this._progress.chunks[index + 1] = ChunkStatus.IN_PROGRESS;
});
delete this._activeStreamBytes[startChunk];
}

protected async _saveProgress() {
Expand Down Expand Up @@ -219,7 +199,6 @@ export default class DownloadEngineFile extends EventEmitter<DownloadEngineFileE
public async close() {
if (this._closed) return;
this._closed = true;
this._activePool?.stop?.();
await this.options.writeStream.close();
await this.options.fetchStream.close();
this.emit("closed");
Expand Down
Loading

0 comments on commit 346c5a5

Please sign in to comment.