Skip to content

Commit

Permalink
Merge pull request #37 from getzep/feat/batched-doc-upload
Browse files Browse the repository at this point in the history
feat:  document upload batching
  • Loading branch information
danielchalef authored Oct 17, 2023
2 parents 9c9f478 + 83fdaa4 commit e0b2a09
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 62 deletions.
2 changes: 1 addition & 1 deletion examples/documents/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ function naiveSplitText(text: string, maxChunkSize: number): string[] {
}

async function main() {
const file = "examples/documents/babbages_calculating_engine.txt";
const file = "babbages_calculating_engine.txt";
const zepApiUrl = "http://localhost:8000";
const maxChunkSize = 500;
const collectionName = `babbage${faker.string.alphanumeric({ length: 8 })}`;
Expand Down
180 changes: 132 additions & 48 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"author": "Daniel Chalef <[email protected]>",
"license": "Apache-2.0",
"dependencies": {
"@supercharge/promise-pool": "^3.1.0",
"semver": "^7.5.4",
"typescript": "^5.1.6"
},
Expand Down
54 changes: 41 additions & 13 deletions src/document_collection.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { PromisePool } from "@supercharge/promise-pool";
import {
docsToDocsWithFloatArray,
docsWithFloatArrayToDocs,
Expand All @@ -11,6 +12,8 @@ import { handleRequest, isFloat } from "./utils";
import { APIError } from "./errors";

const MIN_DOCS_TO_INDEX = 10_000;
const DEFAULT_BATCH_SIZE = 500;
const MAX_CONCURRENT_BATCHES = 5;
const LARGE_BATCH_WARNING_LIMIT = 1000;
const LARGE_BATCH_WARNING = `Batch size is greater than ${LARGE_BATCH_WARNING_LIMIT}.
This may result in slow performance or out-of-memory failures.`;
Expand Down Expand Up @@ -64,20 +67,45 @@ export default class DocumentCollection extends DocumentCollectionModel {
if (documents.length > LARGE_BATCH_WARNING_LIMIT) {
console.warn(LARGE_BATCH_WARNING);
}
const body = JSON.stringify(docsWithFloatArrayToDocs(documents));
const url = this.client.getFullUrl(`/collection/${this.name}/document`);
const response = await handleRequest(
fetch(url, {
method: "POST",
headers: {
...this.client.headers,
"Content-Type": "application/json",
},
body,
})
);

return response.json();
// 1. Split the documents into batches of DEFAULT_BATCH_SIZE
const batches = [];
for (let i = 0; i < documents.length; i += DEFAULT_BATCH_SIZE) {
batches.push(documents.slice(i, i + DEFAULT_BATCH_SIZE));
}

// 2. Create a function that will take a batch of documents and
// return a promise that resolves when the batch is uploaded.
const uploadBatch = async (batch: IDocument[]) => {
const body = JSON.stringify(docsWithFloatArrayToDocs(batch));
const url = this.client.getFullUrl(
`/collection/${this.name}/document`
);
const response = await handleRequest(
fetch(url, {
method: "POST",
headers: {
...this.client.headers,
"Content-Type": "application/json",
},
body,
})
);

return response.json();
};

// 3. Upload the batches in parallel
// limit the number of concurrent batches to MAX_CONCURRENT_BATCHES
const { results } = await PromisePool.for(batches)
.withConcurrency(MAX_CONCURRENT_BATCHES)
.process(async (batch) => {
const result = await uploadBatch(batch);
return result;
});

// Flatten the results array
return results.flat();
}

/**
Expand Down

0 comments on commit e0b2a09

Please sign in to comment.