Skip to content

Commit

Permalink
Insert records in batches sequentially to avoid overloading the DB
Browse files Browse the repository at this point in the history
  • Loading branch information
exAspArk committed Sep 9, 2024
1 parent 0819b29 commit 7dc7575
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
15 changes: 8 additions & 7 deletions core/src/ingestion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))
const chunk = <T>(array: T[], size: number): T[][] =>
[...Array(Math.ceil(array.length / size))].map((_, i) => array.slice(size * i, size + size * i))

const persistFetchedRecords = ({
const persistFetchedRecords = async ({
orm,
fetchedRecords,
insertBatchSize,
Expand All @@ -25,11 +25,13 @@ const persistFetchedRecords = ({
insertBatchSize: number
}) => {
logger.info(`Persisting ${fetchedRecords.length} change message(s)...`)
chunk(fetchedRecords, insertBatchSize).forEach((fetchedRecs) => {
const batches = chunk(fetchedRecords, insertBatchSize)

for (const fetchedRecs of batches) {
const changesAttributes = fetchedRecs.map(({ changeAttributes }) => changeAttributes)
const queryBuilder = orm.em.createQueryBuilder(Change).insert(changesAttributes).onConflict().ignore()
queryBuilder.execute()
})
await queryBuilder.execute()
}
}

const fetchNatsMessages = async ({
Expand Down Expand Up @@ -121,11 +123,10 @@ export const runIngestionLoop = async ({

// Persisting and acking
if (stitchedFetchedRecords.length) {
persistFetchedRecords({ orm, fetchedRecords: stitchedFetchedRecords, insertBatchSize })
try {
await orm.em.flush()
await persistFetchedRecords({ orm, fetchedRecords: stitchedFetchedRecords, insertBatchSize })
} catch (e) {
logger.info(`Error while flushing: ${e}`)
logger.info(`Error while saving: ${e}`)
throw e
}
}
Expand Down
5 changes: 5 additions & 0 deletions docs/docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ keywords: ['Bemi Changelog', 'Bemi New Features', 'Postgres Audit Trails', 'Chan

# Changelog

## 2024-09

* [Bemi Core](https://github.com/BemiHQ/bemi)
* Insert records in batches sequentially to avoid overloading the database at scale

## 2024-08

* Platform
Expand Down

0 comments on commit 7dc7575

Please sign in to comment.