From d27562bd4310ca844b842c62e6a1554a35c07dde Mon Sep 17 00:00:00 2001 From: Ahmed Bouhuolia Date: Fri, 7 Jun 2024 01:07:17 +0200 Subject: [PATCH 1/3] fix: Plaid data available syncing --- .../src/api/controllers/Webhooks/Webhooks.ts | 33 ++++--- packages/server/src/interfaces/Account.ts | 4 + .../src/services/Accounts/CreateAccount.ts | 32 ++++--- .../src/services/Banking/Plaid/PlaidSyncDB.ts | 88 ++++++++++++++----- .../Banking/Plaid/PlaidUpdateTransactions.ts | 72 ++++++++++++--- .../services/Cashflow/CashflowApplication.ts | 4 +- .../DeleteCashflowTransactionService.ts | 61 +++++++------ 7 files changed, 209 insertions(+), 85 deletions(-) diff --git a/packages/server/src/api/controllers/Webhooks/Webhooks.ts b/packages/server/src/api/controllers/Webhooks/Webhooks.ts index acfdc8bd6..4bb7e4d5e 100644 --- a/packages/server/src/api/controllers/Webhooks/Webhooks.ts +++ b/packages/server/src/api/controllers/Webhooks/Webhooks.ts @@ -1,4 +1,4 @@ -import { Router } from 'express'; +import { NextFunction, Router } from 'express'; import { PlaidApplication } from '@/services/Banking/Plaid/PlaidApplication'; import { Request, Response } from 'express'; import { Inject, Service } from 'typedi'; @@ -57,20 +57,25 @@ export class Webhooks extends BaseController { * @param {Response} res * @returns {Response} */ - public async plaidWebhooks(req: Request, res: Response) { + public async plaidWebhooks(req: Request, res: Response, next: NextFunction) { const { tenantId } = req; - const { - webhook_type: webhookType, - webhook_code: webhookCode, - item_id: plaidItemId, - } = req.body; - await this.plaidApp.webhooks( - tenantId, - plaidItemId, - webhookType, - webhookCode - ); - return res.status(200).send({ code: 200, message: 'ok' }); + try { + const { + webhook_type: webhookType, + webhook_code: webhookCode, + item_id: plaidItemId, + } = req.body; + + await this.plaidApp.webhooks( + tenantId, + plaidItemId, + webhookType, + webhookCode + ); + return res.status(200).send({ code: 200, message: 'ok' }); + } catch (error) { + next(error); + } } } diff --git a/packages/server/src/interfaces/Account.ts b/packages/server/src/interfaces/Account.ts index 7c045def3..43ba31444 100644 --- a/packages/server/src/interfaces/Account.ts +++ b/packages/server/src/interfaces/Account.ts @@ -164,3 +164,7 @@ export enum TaxRateAction { DELETE = 'Delete', VIEW = 'View', } + +export interface CreateAccountParams { + ignoreUniqueName: boolean; +} diff --git a/packages/server/src/services/Accounts/CreateAccount.ts b/packages/server/src/services/Accounts/CreateAccount.ts index c0eff3a55..da80d3af4 100644 --- a/packages/server/src/services/Accounts/CreateAccount.ts +++ b/packages/server/src/services/Accounts/CreateAccount.ts @@ -7,6 +7,7 @@ import { IAccountEventCreatedPayload, IAccountEventCreatingPayload, IAccountCreateDTO, + CreateAccountParams, } from '@/interfaces'; import events from '@/subscribers/events'; import UnitOfWork from '@/services/UnitOfWork'; @@ -30,19 +31,22 @@ export class CreateAccount { /** * Authorize the account creation. - * @param {number} tenantId - * @param {IAccountCreateDTO} accountDTO + * @param {number} tenantId + * @param {IAccountCreateDTO} accountDTO */ private authorize = async ( tenantId: number, accountDTO: IAccountCreateDTO, - baseCurrency: string + baseCurrency: string, + params?: CreateAccountParams ) => { // Validate account name uniquiness. - await this.validator.validateAccountNameUniquiness( - tenantId, - accountDTO.name - ); + if (!params.ignoreUniqueName) { + await this.validator.validateAccountNameUniquiness( + tenantId, + accountDTO.name + ); + } // Validate the account code uniquiness. if (accountDTO.code) { await this.validator.isAccountCodeUniqueOrThrowError( @@ -82,7 +86,7 @@ export class CreateAccount { /** * Transformes the create account DTO to input model. - * @param {IAccountCreateDTO} createAccountDTO + * @param {IAccountCreateDTO} createAccountDTO */ private transformDTOToModel = ( createAccountDTO: IAccountCreateDTO, @@ -104,7 +108,8 @@ export class CreateAccount { public createAccount = async ( tenantId: number, accountDTO: IAccountCreateDTO, - trx?: Knex.Transaction + trx?: Knex.Transaction, + params: CreateAccountParams = { ignoreUniqueName: false } ): Promise => { const { Account } = this.tenancy.models(tenantId); @@ -112,8 +117,12 @@ export class CreateAccount { const tenantMeta = await TenantMetadata.query().findOne({ tenantId }); // Authorize the account creation. - await this.authorize(tenantId, accountDTO, tenantMeta.baseCurrency); - + await this.authorize( + tenantId, + accountDTO, + tenantMeta.baseCurrency, + params + ); // Transformes the DTO to model. const accountInputModel = this.transformDTOToModel( accountDTO, @@ -148,3 +157,4 @@ export class CreateAccount { ); }; } + diff --git a/packages/server/src/services/Banking/Plaid/PlaidSyncDB.ts b/packages/server/src/services/Banking/Plaid/PlaidSyncDB.ts index b3bf85ddc..1137db82f 100644 --- a/packages/server/src/services/Banking/Plaid/PlaidSyncDB.ts +++ b/packages/server/src/services/Banking/Plaid/PlaidSyncDB.ts @@ -3,7 +3,11 @@ import { Inject, Service } from 'typedi'; import bluebird from 'bluebird'; import { entries, groupBy } from 'lodash'; import { CreateAccount } from '@/services/Accounts/CreateAccount'; -import { PlaidAccount, PlaidTransaction } from '@/interfaces'; +import { + IAccountCreateDTO, + PlaidAccount, + PlaidTransaction, +} from '@/interfaces'; import { transformPlaidAccountToCreateAccount, transformPlaidTrxsToCashflowCreate, @@ -11,6 +15,7 @@ import { import { DeleteCashflowTransaction } from '@/services/Cashflow/DeleteCashflowTransactionService'; import HasTenancyService from '@/services/Tenancy/TenancyService'; import { CashflowApplication } from '@/services/Cashflow/CashflowApplication'; +import { Knex } from 'knex'; const CONCURRENCY_ASYNC = 10; @@ -28,6 +33,35 @@ export class PlaidSyncDb { @Inject() private deleteCashflowTransactionService: DeleteCashflowTransaction; + /** + * Syncs the Plaid bank account. + * @param {number} tenantId + * @param {IAccountCreateDTO} createBankAccountDTO + * @param {Knex.Transaction} trx + * @returns {Promise} + */ + public async syncBankAccount( + tenantId: number, + createBankAccountDTO: IAccountCreateDTO, + trx?: Knex.Transaction + ) { + const { Account } = this.tenancy.models(tenantId); + const plaidAccount = Account.query().findOne( + 'plaidAccountId', + createBankAccountDTO.plaidAccountId + ); + // Can't continue if the Plaid account is already created. + if (plaidAccount) { + return; + } + await this.createAccountService.createAccount( + tenantId, + createBankAccountDTO, + trx, + { ignoreUniqueName: true } + ); + } + /** * Syncs the plaid accounts to the system accounts. * @param {number} tenantId Tenant ID. @@ -37,7 +71,8 @@ export class PlaidSyncDb { public async syncBankAccounts( tenantId: number, plaidAccounts: PlaidAccount[], - institution: any + institution: any, + trx?: Knex.Transaction ): Promise { const transformToPlaidAccounts = transformPlaidAccountToCreateAccount(institution); @@ -47,7 +82,7 @@ export class PlaidSyncDb { await bluebird.map( accountCreateDTOs, (createAccountDTO: any) => - this.createAccountService.createAccount(tenantId, createAccountDTO), + this.syncBankAccount(tenantId, createAccountDTO, trx), { concurrency: CONCURRENCY_ASYNC } ); } @@ -61,7 +96,8 @@ export class PlaidSyncDb { public async syncAccountTranactions( tenantId: number, plaidAccountId: number, - plaidTranasctions: PlaidTransaction[] + plaidTranasctions: PlaidTransaction[], + trx?: Knex.Transaction ): Promise { const { Account } = this.tenancy.models(tenantId); @@ -87,7 +123,8 @@ export class PlaidSyncDb { (uncategoriedDTO) => this.cashflowApp.createUncategorizedTransaction( tenantId, - uncategoriedDTO + uncategoriedDTO, + trx ), { concurrency: 1 } ); @@ -100,7 +137,8 @@ export class PlaidSyncDb { */ public async syncAccountsTransactions( tenantId: number, - plaidAccountsTransactions: PlaidTransaction[] + plaidAccountsTransactions: PlaidTransaction[], + trx?: Knex.Transaction ): Promise { const groupedTrnsxByAccountId = entries( groupBy(plaidAccountsTransactions, 'account_id') @@ -111,7 +149,8 @@ export class PlaidSyncDb { return this.syncAccountTranactions( tenantId, plaidAccountId, - plaidTransactions + plaidTransactions, + trx ); }, { concurrency: CONCURRENCY_ASYNC } @@ -124,11 +163,12 @@ export class PlaidSyncDb { */ public async syncRemoveTransactions( tenantId: number, - plaidTransactionsIds: string[] + plaidTransactionsIds: string[], + trx?: Knex.Transaction ) { const { CashflowTransaction } = this.tenancy.models(tenantId); - const cashflowTransactions = await CashflowTransaction.query().whereIn( + const cashflowTransactions = await CashflowTransaction.query(trx).whereIn( 'plaidTransactionId', plaidTransactionsIds ); @@ -140,7 +180,8 @@ export class PlaidSyncDb { (transactionId: number) => this.deleteCashflowTransactionService.deleteCashflowTransaction( tenantId, - transactionId + transactionId, + trx ), { concurrency: CONCURRENCY_ASYNC } ); @@ -155,11 +196,12 @@ export class PlaidSyncDb { public async syncTransactionsCursor( tenantId: number, plaidItemId: string, - lastCursor: string + lastCursor: string, + trx?: Knex.Transaction ) { const { PlaidItem } = this.tenancy.models(tenantId); - await PlaidItem.query().findOne({ plaidItemId }).patch({ lastCursor }); + await PlaidItem.query(trx).findOne({ plaidItemId }).patch({ lastCursor }); } /** @@ -169,13 +211,16 @@ export class PlaidSyncDb { */ public async updateLastFeedsUpdatedAt( tenantId: number, - plaidAccountIds: string[] + plaidAccountIds: string[], + trx?: Knex.Transaction ) { const { Account } = this.tenancy.models(tenantId); - await Account.query().whereIn('plaid_account_id', plaidAccountIds).patch({ - lastFeedsUpdatedAt: new Date(), - }); + await Account.query(trx) + .whereIn('plaid_account_id', plaidAccountIds) + .patch({ + lastFeedsUpdatedAt: new Date(), + }); } /** @@ -187,12 +232,15 @@ export class PlaidSyncDb { public async updateAccountsFeedsActive( tenantId: number, plaidAccountIds: string[], - isFeedsActive: boolean = true + isFeedsActive: boolean = true, + trx?: Knex.Transaction ) { const { Account } = this.tenancy.models(tenantId); - await Account.query().whereIn('plaid_account_id', plaidAccountIds).patch({ - isFeedsActive, - }); + await Account.query(trx) + .whereIn('plaid_account_id', plaidAccountIds) + .patch({ + isFeedsActive, + }); } } diff --git a/packages/server/src/services/Banking/Plaid/PlaidUpdateTransactions.ts b/packages/server/src/services/Banking/Plaid/PlaidUpdateTransactions.ts index c740e4705..9d23aef7c 100644 --- a/packages/server/src/services/Banking/Plaid/PlaidUpdateTransactions.ts +++ b/packages/server/src/services/Banking/Plaid/PlaidUpdateTransactions.ts @@ -3,6 +3,8 @@ import { Inject, Service } from 'typedi'; import { PlaidClientWrapper } from '@/lib/Plaid/Plaid'; import { PlaidSyncDb } from './PlaidSyncDB'; import { PlaidFetchedTransactionsUpdates } from '@/interfaces'; +import UnitOfWork from '@/services/UnitOfWork'; +import { Knex } from 'knex'; @Service() export class PlaidUpdateTransactions { @@ -12,12 +14,40 @@ export class PlaidUpdateTransactions { @Inject() private plaidSync: PlaidSyncDb; + @Inject() + private uow: UnitOfWork; + /** - * Handles the fetching and storing of new, modified, or removed transactions - * @param {number} tenantId Tenant ID. - * @param {string} plaidItemId the Plaid ID for the item. + * Handles sync the Plaid item to Bigcaptial under UOW. + * @param {number} tenantId + * @param {number} plaidItemId + * @returns {Promise<{ addedCount: number; modifiedCount: number; removedCount: number; }>} */ public async updateTransactions(tenantId: number, plaidItemId: string) { + return this.uow.withTransaction(tenantId, (trx: Knex.Transaction) => { + return this.updateTransactionsWork(tenantId, plaidItemId, trx); + }); + } + + /** + * Handles the fetching and storing the following: + * - New, modified, or removed transactions. + * - New bank accounts. + * - Last accounts feeds updated at. + * - Turn on the accounts feed flag. + * @param {number} tenantId - Tenant ID. + * @param {string} plaidItemId - The Plaid ID for the item. + * @returns {Promise<{ addedCount: number; modifiedCount: number; removedCount: number; }>} + */ + public async updateTransactionsWork( + tenantId: number, + plaidItemId: string, + trx?: Knex.Transaction + ): Promise<{ + addedCount: number; + modifiedCount: number; + removedCount: number; + }> { // Fetch new transactions from plaid api. const { added, modified, removed, cursor, accessToken } = await this.fetchTransactionUpdates(tenantId, plaidItemId); @@ -29,28 +59,42 @@ export class PlaidUpdateTransactions { } = await plaidInstance.accountsGet(request); const plaidAccountsIds = accounts.map((a) => a.account_id); - const { data: { institution }, } = await plaidInstance.institutionsGetById({ institution_id: item.institution_id, country_codes: ['US', 'UK'], }); - // Update the DB. - await this.plaidSync.syncBankAccounts(tenantId, accounts, institution); + // Sync bank accounts. + await this.plaidSync.syncBankAccounts(tenantId, accounts, institution, trx); + // Sync bank account transactions. await this.plaidSync.syncAccountsTransactions( tenantId, - added.concat(modified) + added.concat(modified), + trx + ); + // Sync removed transactions. + await this.plaidSync.syncRemoveTransactions(tenantId, removed, trx); + // Sync transactions cursor. + await this.plaidSync.syncTransactionsCursor( + tenantId, + plaidItemId, + cursor, + trx ); - await this.plaidSync.syncRemoveTransactions(tenantId, removed); - await this.plaidSync.syncTransactionsCursor(tenantId, plaidItemId, cursor); - // Update the last feeds updated at of the updated accounts. - await this.plaidSync.updateLastFeedsUpdatedAt(tenantId, plaidAccountsIds); - + await this.plaidSync.updateLastFeedsUpdatedAt( + tenantId, + plaidAccountsIds, + trx + ); // Turn on the accounts feeds flag. - await this.plaidSync.updateAccountsFeedsActive(tenantId, plaidAccountsIds); - + await this.plaidSync.updateAccountsFeedsActive( + tenantId, + plaidAccountsIds, + true, + trx + ); return { addedCount: added.length, modifiedCount: modified.length, diff --git a/packages/server/src/services/Cashflow/CashflowApplication.ts b/packages/server/src/services/Cashflow/CashflowApplication.ts index 6688c9016..5aaa9936f 100644 --- a/packages/server/src/services/Cashflow/CashflowApplication.ts +++ b/packages/server/src/services/Cashflow/CashflowApplication.ts @@ -1,3 +1,4 @@ +import { Knex } from 'knex'; import { Inject, Service } from 'typedi'; import { DeleteCashflowTransaction } from './DeleteCashflowTransactionService'; import { UncategorizeCashflowTransaction } from './UncategorizeCashflowTransaction'; @@ -119,7 +120,8 @@ export class CashflowApplication { */ public createUncategorizedTransaction( tenantId: number, - createUncategorizedTransactionDTO: CreateUncategorizedTransactionDTO + createUncategorizedTransactionDTO: CreateUncategorizedTransactionDTO, + trx?: Knex.Transaction ) { return this.createUncategorizedTransactionService.create( tenantId, diff --git a/packages/server/src/services/Cashflow/DeleteCashflowTransactionService.ts b/packages/server/src/services/Cashflow/DeleteCashflowTransactionService.ts index dd6c3002a..dacf60c6f 100644 --- a/packages/server/src/services/Cashflow/DeleteCashflowTransactionService.ts +++ b/packages/server/src/services/Cashflow/DeleteCashflowTransactionService.ts @@ -30,7 +30,8 @@ export class DeleteCashflowTransaction { */ public deleteCashflowTransaction = async ( tenantId: number, - cashflowTransactionId: number + cashflowTransactionId: number, + trx?: Knex.Transaction ): Promise<{ oldCashflowTransaction: ICashflowTransaction }> => { const { CashflowTransaction, CashflowTransactionLine } = this.tenancy.models(tenantId); @@ -43,34 +44,44 @@ export class DeleteCashflowTransaction { this.throwErrorIfTransactionNotFound(oldCashflowTransaction); // Starting database transaction. - return this.uow.withTransaction(tenantId, async (trx: Knex.Transaction) => { - // Triggers `onCashflowTransactionDelete` event. - await this.eventPublisher.emitAsync(events.cashflow.onTransactionDeleting, { - trx, - tenantId, - oldCashflowTransaction, - } as ICommandCashflowDeletingPayload); + return this.uow.withTransaction( + tenantId, + async (trx: Knex.Transaction) => { + // Triggers `onCashflowTransactionDelete` event. + await this.eventPublisher.emitAsync( + events.cashflow.onTransactionDeleting, + { + trx, + tenantId, + oldCashflowTransaction, + } as ICommandCashflowDeletingPayload + ); - // Delete cashflow transaction associated lines first. - await CashflowTransactionLine.query(trx) - .where('cashflow_transaction_id', cashflowTransactionId) - .delete(); + // Delete cashflow transaction associated lines first. + await CashflowTransactionLine.query(trx) + .where('cashflow_transaction_id', cashflowTransactionId) + .delete(); - // Delete cashflow transaction. - await CashflowTransaction.query(trx) - .findById(cashflowTransactionId) - .delete(); + // Delete cashflow transaction. + await CashflowTransaction.query(trx) + .findById(cashflowTransactionId) + .delete(); - // Triggers `onCashflowTransactionDeleted` event. - await this.eventPublisher.emitAsync(events.cashflow.onTransactionDeleted, { - trx, - tenantId, - cashflowTransactionId, - oldCashflowTransaction, - } as ICommandCashflowDeletedPayload); + // Triggers `onCashflowTransactionDeleted` event. + await this.eventPublisher.emitAsync( + events.cashflow.onTransactionDeleted, + { + trx, + tenantId, + cashflowTransactionId, + oldCashflowTransaction, + } as ICommandCashflowDeletedPayload + ); - return { oldCashflowTransaction }; - }); + return { oldCashflowTransaction }; + }, + trx + ); }; /** From 494d2c1fe086c26b772441d718cba66e95fe0f4e Mon Sep 17 00:00:00 2001 From: Ahmed Bouhuolia Date: Fri, 7 Jun 2024 01:11:19 +0200 Subject: [PATCH 2/3] fix: TS typing --- packages/server/src/api/controllers/Webhooks/Webhooks.ts | 7 +++---- .../server/src/services/Cashflow/CashflowApplication.ts | 3 ++- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/server/src/api/controllers/Webhooks/Webhooks.ts b/packages/server/src/api/controllers/Webhooks/Webhooks.ts index 4bb7e4d5e..9abc03dfa 100644 --- a/packages/server/src/api/controllers/Webhooks/Webhooks.ts +++ b/packages/server/src/api/controllers/Webhooks/Webhooks.ts @@ -1,7 +1,6 @@ -import { NextFunction, Router } from 'express'; -import { PlaidApplication } from '@/services/Banking/Plaid/PlaidApplication'; -import { Request, Response } from 'express'; +import { NextFunction, Router, Request, Response } from 'express'; import { Inject, Service } from 'typedi'; +import { PlaidApplication } from '@/services/Banking/Plaid/PlaidApplication'; import BaseController from '../BaseController'; import { LemonSqueezyWebhooks } from '@/services/Subscription/LemonSqueezyWebhooks'; import { PlaidWebhookTenantBootMiddleware } from '@/services/Banking/Plaid/PlaidWebhookTenantBootMiddleware'; @@ -34,7 +33,7 @@ export class Webhooks extends BaseController { * @param {Response} res * @returns {Response} */ - public async lemonWebhooks(req: Request, res: Response, next: any) { + public async lemonWebhooks(req: Request, res: Response, next: NextFunction) { const data = req.body; const signature = req.headers['x-signature'] ?? ''; const rawBody = req.rawBody; diff --git a/packages/server/src/services/Cashflow/CashflowApplication.ts b/packages/server/src/services/Cashflow/CashflowApplication.ts index 5aaa9936f..f7487badd 100644 --- a/packages/server/src/services/Cashflow/CashflowApplication.ts +++ b/packages/server/src/services/Cashflow/CashflowApplication.ts @@ -125,7 +125,8 @@ export class CashflowApplication { ) { return this.createUncategorizedTransactionService.create( tenantId, - createUncategorizedTransactionDTO + createUncategorizedTransactionDTO, + trx ); } From 3dadbeac4dc68997f334f70b49f4822b791303f1 Mon Sep 17 00:00:00 2001 From: Ahmed Bouhuolia Date: Fri, 7 Jun 2024 01:30:08 +0200 Subject: [PATCH 3/3] fix: all sql queries should be under one transaction --- packages/server/src/services/Banking/Plaid/PlaidSyncDB.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/server/src/services/Banking/Plaid/PlaidSyncDB.ts b/packages/server/src/services/Banking/Plaid/PlaidSyncDB.ts index 1137db82f..004662cfd 100644 --- a/packages/server/src/services/Banking/Plaid/PlaidSyncDB.ts +++ b/packages/server/src/services/Banking/Plaid/PlaidSyncDB.ts @@ -46,7 +46,7 @@ export class PlaidSyncDb { trx?: Knex.Transaction ) { const { Account } = this.tenancy.models(tenantId); - const plaidAccount = Account.query().findOne( + const plaidAccount = await Account.query().findOne( 'plaidAccountId', createBankAccountDTO.plaidAccountId ); @@ -101,11 +101,11 @@ export class PlaidSyncDb { ): Promise { const { Account } = this.tenancy.models(tenantId); - const cashflowAccount = await Account.query() + const cashflowAccount = await Account.query(trx) .findOne({ plaidAccountId }) .throwIfNotFound(); - const openingEquityBalance = await Account.query().findOne( + const openingEquityBalance = await Account.query(trx).findOne( 'slug', 'opening-balance-equity' );