Skip to content

Commit

Permalink
Merge pull request #1388 from emeraldpay/fix/slow-history-update
Browse files Browse the repository at this point in the history
  • Loading branch information
splix authored Dec 23, 2024
2 parents b508cc9 + fd47410 commit 182942a
Show file tree
Hide file tree
Showing 17 changed files with 422 additions and 240 deletions.
62 changes: 43 additions & 19 deletions packages/services/src/services/AllowanceService.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Publisher, token as TokenApi } from '@emeraldpay/api';
import {address as AddressApi, Publisher, token as TokenApi} from '@emeraldpay/api';
import { EntryId } from '@emeraldpay/emerald-vault-core';
import { extractWalletId } from '@emeraldpay/emerald-vault-core/lib/types';
import {
Expand All @@ -18,6 +18,7 @@ import { EmeraldApiAccess } from '../emerald-client/ApiAccess';
import { BalanceService } from './balance/BalanceService';
import { Service } from './ServiceManager';
import {isBlockchainId} from "@emeraldwallet/core";
import {BufferedHandler} from "./BuffereHandler";

interface Subscription {
address: string;
Expand All @@ -29,6 +30,20 @@ function isEqual(first: Subscription, second: Subscription): boolean {
return first.address === second.address && first.blockchain === second.blockchain && first.entryId === second.entryId;
}

// from packages/store/src/allowances/types.ts
type Allowance = {
address: string;
allowance: string;
available: string;
blockchain: BlockchainCode;
contractAddress: string;
ownerAddress: string;
spenderAddress: string;
timestamp: number;
ownerControl?: AddressApi.AddressControl;
spenderControl?: AddressApi.AddressControl;
}

type AllowanceHandler = (allowance: TokenApi.AddressAllowanceAmount) => void;

const log = Logger.forCategory('AllowanceService');
Expand All @@ -49,6 +64,8 @@ export class AllowanceService implements Service {
private subscribers: Map<string, Publisher<TokenApi.AddressAllowanceAmount>> = new Map();
private subscriptions: Subscription[] = [];

private buffer: BufferedHandler<Allowance>;

constructor(
ipcMain: IpcMain,
apiAccess: EmeraldApiAccess,
Expand All @@ -66,6 +83,9 @@ export class AllowanceService implements Service {
this.tokens = settings.getTokens();
this.tokenRegistry = new TokenRegistry(this.tokens);

this.buffer = new BufferedHandler<Allowance>(this.submitAllowances());
this.buffer.start();

ipcMain.handle(IpcCommands.ALLOWANCE_SET_TOKENS, (event, tokens) => {
this.tokens = tokens.filter((token: TokenData) => isBlockchainId(token.blockchain));
this.tokenRegistry = new TokenRegistry(this.tokens);
Expand Down Expand Up @@ -179,26 +199,30 @@ export class AllowanceService implements Service {
this.apiAccess.addressClient.describe({ blockchain, address: ownerAddress }),
this.apiAccess.addressClient.describe({ blockchain, address: spenderAddress }),
]).then(([{ control: ownerControl }, { control: spenderControl }]) =>
this.webContents.send(IpcCommands.STORE_DISPATCH, {
type: 'WALLET/ALLOWANCE/SET_ALLOWANCE',
payload: {
allowance: {
address,
allowance,
available,
contractAddress,
ownerAddress,
ownerControl,
spenderAddress,
spenderControl,
blockchain: blockchainCode,
timestamp: Date.now(),
},
tokens: this.tokens,
},
}),
this.buffer.onData({
address,
allowance,
available,
contractAddress,
ownerAddress,
ownerControl,
spenderAddress,
spenderControl,
blockchain: blockchainCode,
timestamp: Date.now(),
})
);
});
};
}

private submitAllowances() {
return async (values: Allowance[]) => {
this.webContents.send(IpcCommands.STORE_DISPATCH, {
type: 'WALLET/ALLOWANCE/SET_ALLOWANCE',
allowances: values,
tokens: this.tokens,
});
};
}
}
76 changes: 76 additions & 0 deletions packages/services/src/services/BuffereHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* A handler that buffers incoming data and flushes it to the handler when the buffer is full or a certain time has passed.
*/
export class BufferedHandler<T> {

/**
* The maximum size of the buffer before it is flushed.
* @private
*/
private readonly limitSize: number = 100;

/**
* The maximum time in milliseconds before the buffer is flushed.
* @private
*/
private readonly limitTimeMs: number = 250;
/**
* The handler that is called when the buffer is flushed.
* @private
*/
private readonly handler: (values: T[]) => Promise<void>;

private buffer: T[] = [];
private closed = false;
private lastFlush = Date.now();

constructor(handler: (values: T[]) => Promise<void>, limitSize?: number, limitTimeMs?: number) {
this.handler = handler;
if (limitSize != null) {
this.limitSize = limitSize;
}
if (limitTimeMs != null) {
this.limitTimeMs = limitTimeMs;
}
}

start() {
this.scheduleNext();
}

private scheduleNext() {
setTimeout(() => {
if (this.lastFlush + this.limitTimeMs >= Date.now()) {
this.flush();
}
if (!this.closed) {
this.scheduleNext();
}
}, this.limitTimeMs - (Date.now() - this.lastFlush));
}

accept(): (tx: T) => void {
return (tx) => this.onData(tx);
}

onData(tx: T): void {
this.buffer.push(tx);
if (this.buffer.length >= this.limitSize) {
this.flush();
}
}

flush(): void {
if (this.buffer.length > 0) {
this.handler(this.buffer)
.catch((e) => console.error('Error while handling buffer', e));
}
this.buffer = [];
this.lastFlush = Date.now();
}

close() {
this.flush();
this.closed = true;
}
}
127 changes: 71 additions & 56 deletions packages/services/src/services/TransactionService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import { PersistentStateManager } from '@emeraldwallet/persistent-state';
import { IpcMain, WebContents } from 'electron';
import { EmeraldApiAccess } from '../emerald-client/ApiAccess';
import { Service } from './ServiceManager';
import {BufferedHandler} from "./BuffereHandler";

const { ChangeType: ApiType, Direction: ApiDirection } = TransactionApi;
const { ChangeType: StateType, Direction: StateDirection, State, Status } = PersistentState;

type EntryIdentifier = { entryId: string; blockchain: number; identifier: string };
type TransactionHandler = (tx: TransactionApi.AddressTransaction) => void;
type TransactionHandler = (tx: TransactionApi.AddressTransaction[]) => Promise<void>;

const log = Logger.forCategory('TransactionService');

Expand Down Expand Up @@ -118,17 +119,19 @@ export class TransactionService implements Service {

this.persistentState.txhistory
.query({ state: State.SUBMITTED })
.then(({ items: transactions }) =>
Promise.all(
transactions.map(({ blockchain: txBlockchain, txId }) =>
this.persistentState.txhistory.remove(txBlockchain, txId).then(() =>
this.webContents.send(IpcCommands.STORE_DISPATCH, {
type: 'WALLET/HISTORY/REMOVE_STORED_TX',
txId,
}),
.then(({ items: transactions }) => {
let removePromise = Promise.all(
transactions.map(({blockchain: txBlockchain, txId}) =>
this.persistentState.txhistory.remove(txBlockchain, txId)
),
),
),
);
removePromise.then(() => {
this.webContents.send(IpcCommands.STORE_DISPATCH, {
type: 'WALLET/HISTORY/REMOVE_STORED_TX',
txIds: transactions.map((tx) => tx.txId),
})
})
},
)
.then(() =>
entryIdentifiers.forEach(({ blockchain, entryId, identifier }) =>
Expand Down Expand Up @@ -157,6 +160,8 @@ export class TransactionService implements Service {
private subscribe(identifier: string, blockchain: number, entryId: string): void {
const blockchainCode = blockchainIdToCode(blockchain);

if (blockchain != 0) return;

this.persistentState.txhistory
.getCursor(identifier)
.then((cursor) => {
Expand All @@ -172,29 +177,32 @@ export class TransactionService implements Service {
};

const handler = this.processTransaction(identifier, blockchain, entryId);
const buffer = new BufferedHandler(handler);
buffer.start();

this.apiAccess.transactionClient
.getTransactions(request)
.onData(handler)
.onError((error) =>
log.error(
`Error while getting transactions for ${identifier} on ${blockchainCode},`,
`restart after ${this.restartTimeout} seconds...`,
error,
),
);
.onData(buffer.accept())
.onError((error) => {
log.error(
`Error while getting transactions for ${identifier} on ${blockchainCode},`,
`restart after ${this.restartTimeout} seconds...`,
error,
);
buffer.flush();
});

const subscriber = this.apiAccess.transactionClient
.subscribeTransactions(request)
.onData(handler)
.onData(buffer.accept())
.onError((error) => log.error(`Error while subscribing for ${identifier} on ${blockchainCode}`, error))
.finally(() => {
log.info(
`Subscription for ${identifier} on ${blockchainCode} is closed,`,
`restart after ${this.restartTimeout} seconds...`,
);

setTimeout(() => this.subscribe(identifier, blockchain, entryId), this.restartTimeout * 1000);
buffer.flush();
});

this.subscribers.set(identifier, subscriber);
Expand All @@ -205,22 +213,25 @@ export class TransactionService implements Service {
private processTransaction(identifier: string, blockchain: number, entryId: string): TransactionHandler {
const blockchainCode = blockchainIdToCode(blockchain);

return (tx) => {
log.info(`Receive transaction ${tx.txId} for ${identifier} on ${blockchainCode}...`);
return async (txs: TransactionApi.AddressTransaction[]) => {
let removed = txs.filter((tx) => tx.removed);
let added = txs.filter((tx) => !tx.removed);

if (tx.removed) {
log.info(`Receive transactions ${txs.length} (+${added.length}, -${removed.length}) for ${identifier} on ${blockchainCode}...`);

removed.forEach((tx) => {
this.persistentState.txhistory
.remove(blockchain, tx.txId)
.then(() =>
this.webContents.send(IpcCommands.STORE_DISPATCH, {
type: 'WALLET/HISTORY/REMOVE_STORED_TX',
txId: tx.txId,
}),
)
.catch((error) =>
log.error(`Error while removing transaction for ${identifier} on ${blockchainCode} from state`, error),
);
} else {
});
this.webContents.send(IpcCommands.STORE_DISPATCH, {
type: 'WALLET/HISTORY/REMOVE_STORED_TX',
txIds: removed.map((tx) => tx.txId),
});

let mergedPromise = added.map((tx) => {
let confirmation: PersistentState.TransactionConfirmation | null = null;

const now = new Date();
Expand Down Expand Up @@ -257,38 +268,42 @@ export class TransactionService implements Service {
txId: tx.txId,
};

this.persistentState.txhistory
return this.persistentState.txhistory
.submit({ ...confirmation, ...transaction })
.then((merged) => {
if (tx.cursor != null && tx.cursor.length > 0) {
this.persistentState.txhistory
return this.persistentState.txhistory
.setCursor(identifier, tx.cursor)
.catch((error) =>
log.error(`Error while set cursor ${tx.cursor} for ${identifier} on ${blockchainCode}`, error),
);
)
.then(() => merged);
} else {
return Promise.resolve(merged)
}

const walletId = EntryIdOp.of(entryId).extractWalletId();

this.persistentState.txmeta
.get(blockchainCode, merged.txId)
.then((meta) =>
this.webContents.send(IpcCommands.STORE_DISPATCH, {
meta,
walletId,
type: 'WALLET/HISTORY/UPDATE_STORED_TX',
tokens: this.tokens,
transaction: merged,
}),
)
.catch((error) =>
log.error(`Error while getting transaction meta for ${identifier} on ${blockchainCode}`, error),
);
})
.catch((error) =>
log.error(`Error while submitting transaction data for ${identifier} on ${blockchainCode}`, error),
);
}
};
});

const merged = await Promise.all(mergedPromise);
const meta = await Promise.all(
merged.map((tx) =>
this.persistentState.txmeta
.get(blockchainCode, tx.txId)
.catch((error) => {
log.error(`Error while getting transaction meta for ${identifier} on ${blockchainCode}`, error);
return null;
})
));

const walletId = EntryIdOp.of(entryId).extractWalletId();
this.webContents.send(IpcCommands.STORE_DISPATCH, {
transactions: merged.map((tx, index) => {
return { meta: meta[index], transaction: tx };
}),
type: 'WALLET/HISTORY/UPDATE_STORED_TX',
tokens: this.tokens,
walletId,
});
}
}
}
2 changes: 1 addition & 1 deletion packages/services/src/services/balance/BalanceListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { AddressBalance, AnyAsset, BalanceRequest, Publisher, Utxo } from '@emer
import { BlockchainClient } from '@emeraldpay/api-node';
import { BlockchainCode, EthereumAddress, Logger, blockchainCodeToId, isBitcoin } from '@emeraldwallet/core';

interface AddressEvent {
export interface AddressEvent {
address: string;
balance: string;
asset: AnyAsset;
Expand Down
Loading

0 comments on commit 182942a

Please sign in to comment.