From 66aea7f6eef5dcafbf6c73c60e573de83f25818a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Vitor=20de=20Lima=20Matos?= Date: Fri, 4 Sep 2020 21:52:51 -0300 Subject: [PATCH] sdk: replace reducer epics with persister middleware --- raiden-ts/src/db/adapter.ts | 2 +- raiden-ts/src/db/epics.ts | 43 +----- raiden-ts/src/db/types.ts | 1 + raiden-ts/src/db/utils.ts | 23 ++- raiden-ts/src/helpers.ts | 4 +- raiden-ts/src/persister.ts | 91 ++++++++++++ raiden-ts/src/raiden.ts | 4 +- raiden-ts/src/transfers/epics/db.ts | 188 ------------------------ raiden-ts/src/transfers/epics/index.ts | 1 - raiden-ts/src/transfers/epics/locked.ts | 14 +- raiden-ts/src/transfers/persister.ts | 167 +++++++++++++++++++++ raiden-ts/src/transfers/utils.ts | 2 +- raiden-ts/tests/unit/mocks.ts | 4 +- 13 files changed, 292 insertions(+), 252 deletions(-) create mode 100644 raiden-ts/src/persister.ts delete mode 100644 raiden-ts/src/transfers/epics/db.ts create mode 100644 raiden-ts/src/transfers/persister.ts diff --git a/raiden-ts/src/db/adapter.ts b/raiden-ts/src/db/adapter.ts index def788da88..ad22aba8d4 100644 --- a/raiden-ts/src/db/adapter.ts +++ b/raiden-ts/src/db/adapter.ts @@ -14,7 +14,7 @@ let defaultPouchAdapter: string; * @param descending - Wether to swap start & endkey for reverse reverse search * @returns allDocs's options to fetch all documents which keys start with prefix */ -function byPrefix(prefix: string, descending = false) { +export function byPrefix(prefix: string, descending = false) { const start = prefix; const end = prefix + '\ufff0'; return !descending diff --git a/raiden-ts/src/db/epics.ts b/raiden-ts/src/db/epics.ts index 562ee8d045..7efa6a19e6 100644 --- a/raiden-ts/src/db/epics.ts +++ b/raiden-ts/src/db/epics.ts @@ -1,50 +1,9 @@ import { Observable } from 'rxjs'; -import { distinctUntilChanged, ignoreElements, pairwise, finalize, tap } from 'rxjs/operators'; +import { ignoreElements, finalize } from 'rxjs/operators'; import { RaidenAction } from '../actions'; import { RaidenState } from '../state'; import { RaidenEpicDeps } from '../types'; -import { upsert } from './utils'; - -/** - * Update state based on actions and state changes - * - * For certain common actions with trivial reduced state side-effects, it may try to produce an - * optimized update command. Otherwise, the whole state will be upserted. - * - * @param action$ - Observable of RaidenActions - * @param state$ - Observable of RaidenStates - * @param deps - Epics dependencies - * @param deps.db - Database instance - * @returns observable to persist state changes to db - */ -export const dbStateEpic = ( - {}: Observable, - state$: Observable, - { db }: RaidenEpicDeps, -): Observable => - state$.pipe( - distinctUntilChanged(), - pairwise(), - tap(([prev, cur]) => { - for (const k in cur) { - const key = k as keyof RaidenState; - // key has same value, pass over - if (cur[key] === prev[key]) continue; - else if (key === 'channels' || key === 'oldChannels') { - // iterate over channels separately - for (const id in cur[key]) { - if (cur[key][id] === prev[key][id]) continue; - upsert(db.channels, cur[key][id]); - } - } else upsert(db.state, { _id: key, value: cur[key] }); - // notice we don't handle deleted values: the set of top-level keys are constant, - // oldChannels aren't deleted, and current channels are only moved to oldChannels, - // which share the [channelUniqueKey], so they get replaced - } - }), - ignoreElements(), - ); /** * Shutdown database instance when raiden shuts down diff --git a/raiden-ts/src/db/types.ts b/raiden-ts/src/db/types.ts index 582ab16202..7f1ecc166d 100644 --- a/raiden-ts/src/db/types.ts +++ b/raiden-ts/src/db/types.ts @@ -60,4 +60,5 @@ export interface RaidenDatabase { state: Loki.Collection; channels: Loki.Collection; transfers: Loki.Collection; + transfersKeys: Set; } diff --git a/raiden-ts/src/db/utils.ts b/raiden-ts/src/db/utils.ts index 1445f75a65..a790d81266 100644 --- a/raiden-ts/src/db/utils.ts +++ b/raiden-ts/src/db/utils.ts @@ -16,7 +16,7 @@ import { Channel } from '../channels'; import { channelKey, channelUniqueKey } from '../channels/utils'; import { last, Address } from '../utils/types'; -import { TransferState } from '../transfers/state'; +import { Direction, TransferState } from '../transfers/state'; import { RaidenStorage, Migrations, @@ -26,7 +26,7 @@ import { RaidenDatabase, StateMember, } from './types'; -import { getDefaultPouchAdapter, LokiRaidenAdapter } from './adapter'; +import { byPrefix, getDefaultPouchAdapter, LokiRaidenAdapter } from './adapter'; import defaultMigrations from './migrations'; /** @@ -110,7 +110,17 @@ async function makeDatabase(storage: RaidenStorage): Promise { }); }); - return { storage, db, state, channels, transfers }; + // populate transfersKeys with all keys from database, since loki only keep track of a subset + // of the transfers + const transfersKeys = new Set(); + const [sentTransfers, receivedTransfers] = await Promise.all([ + storage.allDocs(byPrefix(`${Direction.SENT}:`)), + storage.allDocs(byPrefix(`${Direction.RECEIVED}:`)), + ]); + sentTransfers.rows.forEach(({ id }) => transfersKeys.add(id)); + receivedTransfers.rows.forEach(({ id }) => transfersKeys.add(id)); + + return { storage, db, state, channels, transfers, transfersKeys }; } /** @@ -180,7 +190,7 @@ export function get$( * @param db - Database to query state from * @returns mapping object potentially decodable to RaidenState */ -export async function getRaidenState(db: RaidenDatabase): Promise { +export function getRaidenState(db: RaidenDatabase): any | undefined { const state = { channels: {}, oldChannels: {} } as any; for (const { _id, value } of db.state.find()) { @@ -198,13 +208,12 @@ export async function getRaidenState(db: RaidenDatabase): Promise { +export function putRaidenState(db: RaidenDatabase, state: RaidenState): void { for (const [key, value] of Object.entries(state)) { if (key === 'channels' || key === 'oldChannels') { for (const channel of Object.values(value)) { diff --git a/raiden-ts/src/helpers.ts b/raiden-ts/src/helpers.ts index 9b66c4ef57..3b34a2c8fa 100644 --- a/raiden-ts/src/helpers.ts +++ b/raiden-ts/src/helpers.ts @@ -477,10 +477,10 @@ export async function getState( db = await migrateDatabase.call(dbCtor, dbName); } - let state = await getRaidenState(db); + let state = getRaidenState(db); if (!state) { state = makeInitialState({ network, address, contractsInfo: contractsInfo }); - await putRaidenState(db, state); + putRaidenState(db, state); } else { state = decode(RaidenState, state); } diff --git a/raiden-ts/src/persister.ts b/raiden-ts/src/persister.ts new file mode 100644 index 0000000000..e092c393d7 --- /dev/null +++ b/raiden-ts/src/persister.ts @@ -0,0 +1,91 @@ +/** + * This file introduces a 'persister' middleware for redux + * + * It's coupled with RaidenDatabase, and runs a root _reducer-like_ function for each action/state + * change going through redux state machine. + * The function receives the RaidenDatabase instance, a tuple containing the current and previous + * state, and the action which triggered this change. Like reducers, the function **must not** + * change state, but in this case, return value is also ignored. Instead, it should do whatever + * logic it needs to persist the new state on the database. Redux-state changes should still be + * performed on reducers, as usual. + * This is useful as a reducer-like synchronous function for members of the state which aren't + * kept in the state, but on the database instead, or to sync/persist state changes to the + * database storage. + */ + +import { Dispatch, Middleware } from 'redux'; +import type { RaidenState } from './state'; +import type { RaidenAction } from './actions'; +import type { RaidenDatabase } from './db/types'; +import { upsert } from './db/utils'; + +import { transfersPersister } from './transfers/persister'; + +/** + * Persister function type: receives RaidenDatabase, new/prev state tuple, and action + */ +export type RaidenPersister = ( + db: RaidenDatabase, + states: readonly [currentState: RaidenState, prevState: RaidenState], + action: RaidenAction, +) => void; + +/** + * Raiden root persister: called by the persister middleware. Should call sub-persisters if needed + * Persisters should be synchronous, since they block the state machine. + * + * @param db - RaidenDatabase to persist/sync state on + * @param states - Pair of new/prev RaidenStates + * @param action - Action which got dispatched through redux + */ +export const raidenRootPersister: RaidenPersister = (db, states, action) => { + const [state, prev] = states; + if (state !== prev) { + for (const k in state) { + const key = k as keyof RaidenState; + // key has same value, pass over + if (state[key] === prev[key]) continue; + else if (key === 'channels' || key === 'oldChannels') { + // iterate over channels separately + for (const id in state[key]) { + if (state[key][id] === prev[key][id]) continue; + upsert(db.channels, state[key][id]); + } + } else upsert(db.state, { _id: key, value: state[key] }); + // notice we don't handle deleted values: the set of top-level keys are constant, + // oldChannels aren't deleted, and current channels are only moved to oldChannels, + // which share the [channelUniqueKey], so they get replaced + } + } + + // sub persisters + for (const persister of [transfersPersister]) persister(db, states, action); +}; + +/** + * Create a raiden persister middleware for redux. + * It calls a given root persister every time an action goes through Redux state machine, passing + * the db RaidenDatabase, new and previous states, and the action. + * The persister gets called after the action reduced the state. + * + * @param db - Raiden Database object, passed to persister + * @param persister - Root persister to run + * @returns Middleware function to be applied to redux + */ +export function createPersisterMiddleware( + db: RaidenDatabase, + persister: RaidenPersister = raidenRootPersister, +): Middleware> { + const log = db.storage.constructor.__defaults.log; + return (store) => (next) => (action) => { + const prevState = store.getState(); + const result = next(action); + const state = store.getState(); + try { + persister(db, [state, prevState], action); + } catch (err) { + log?.warn?.('Persister error', err); + } + return result; + }; +} diff --git a/raiden-ts/src/raiden.ts b/raiden-ts/src/raiden.ts index c560e6a6fa..6c94bb5e52 100644 --- a/raiden-ts/src/raiden.ts +++ b/raiden-ts/src/raiden.ts @@ -82,6 +82,7 @@ import { import { RaidenError, ErrorCodes } from './utils/error'; import { RaidenDatabase } from './db/types'; import { get$, dumpDatabaseToArray } from './db/utils'; +import { createPersisterMiddleware } from './persister'; export class Raiden { private readonly store: Store; @@ -274,12 +275,13 @@ export class Raiden { RaidenState, RaidenEpicDeps >({ dependencies: this.deps }); + const persisterMiddleware = createPersisterMiddleware(db); this.store = createStore( raidenReducer, // workaround for redux's PreloadedState issues with branded values state as any, // eslint-disable-line @typescript-eslint/no-explicit-any - applyMiddleware(loggerMiddleware, this.epicMiddleware), + applyMiddleware(loggerMiddleware, this.epicMiddleware, persisterMiddleware), ); // populate deps.latest$, to ensure config, logger && pollingInterval are setup before start diff --git a/raiden-ts/src/transfers/epics/db.ts b/raiden-ts/src/transfers/epics/db.ts deleted file mode 100644 index 6eb6be8ab4..0000000000 --- a/raiden-ts/src/transfers/epics/db.ts +++ /dev/null @@ -1,188 +0,0 @@ -import { Observable } from 'rxjs'; -import { filter, ignoreElements, withLatestFrom, tap } from 'rxjs/operators'; -import pick from 'lodash/fp/pick'; - -import { RaidenAction } from '../../actions'; -import { RaidenState } from '../../state'; -import { RaidenEpicDeps } from '../../types'; -import { isActionOf } from '../../utils/actions'; -import { timed } from '../../utils/types'; -import { channelKey, channelUniqueKey } from '../../channels/utils'; -import { ChannelState } from '../../channels/state'; -import { channelClose, channelSettle } from '../../channels/actions'; -import { - transferSecret, - transferSecretRegister, - transferSigned, - transferUnlock, - transferExpire, - transferSecretRequest, - transferSecretReveal, - transferProcessed, - transferUnlockProcessed, - transferExpireProcessed, -} from '../actions'; -import { transferKey } from '../utils'; -import { TransferState, Direction } from '../state'; - -const END = { [Direction.SENT]: 'own', [Direction.RECEIVED]: 'partner' } as const; - -function channelKeyFromTransfer( - transfer: TransferState | transferSigned | transferUnlock.success | transferExpire.success, -) { - let tokenNetwork, partner; - if ('type' in transfer) { - tokenNetwork = transfer.payload.message.token_network_address; - partner = transfer.payload.partner; - } else { - tokenNetwork = transfer.transfer.token_network_address; - partner = transfer.partner; - } - return channelKey({ tokenNetwork, partner }); -} - -export const transferSignedReducerEpic = ( - action$: Observable, - state$: Observable, - { db }: RaidenEpicDeps, -): Observable => - action$.pipe( - filter(transferSigned.is), - withLatestFrom(state$), - // only proceed if channel is open and nonce is current - filter(([action, state]) => { - const channel = state.channels[channelKeyFromTransfer(action)]; - return ( - channel?.state === ChannelState.open && - // here, action was already handled by reducer; if it fails, this is an old action, ignore - channel[END[action.meta.direction]].balanceProof.nonce.eq(action.payload.message.nonce) - ); - }), - tap(([action]) => - // insert will fail if there's already a transfer with 'key' - db.transfers.insert({ - _id: transferKey(action.meta), - channel: channelUniqueKey({ - id: action.payload.message.channel_identifier.toNumber(), - tokenNetwork: action.payload.message.token_network_address, - partner: action.payload.partner, - }), - ...action.meta, // direction, secrethash - expiration: action.payload.message.lock.expiration.toNumber(), - transfer: timed(action.payload.message), - partner: action.payload.partner, - fee: action.payload.fee, - }), - ), - ignoreElements(), - ); - -export const transferCompletedReducerEpic = ( - action$: Observable, - state$: Observable, - { db }: RaidenEpicDeps, -): Observable => - action$.pipe( - filter(isActionOf([transferUnlock.success, transferExpire.success])), - withLatestFrom(state$), - // only proceed if channel is open and nonce is current - filter(([action, state]) => { - const channel = state.channels[channelKeyFromTransfer(action)]; - return ( - channel?.state === ChannelState.open && - // here, action was already handled by reducer; if it fails, this is an old action, ignore - channel[END[action.meta.direction]].balanceProof.nonce.eq(action.payload.message.nonce) - ); - }), - tap(([action]) => { - const field = transferUnlock.success.is(action) ? 'unlock' : 'expired'; - const doc = db.transfers.by('_id', transferKey(action.meta)); - if (!doc || doc[field]) return; - db.transfers.update({ ...doc, [field]: timed(action.payload.message) }); - }), - ignoreElements(), - ); - -export const transferSecretReducerEpic = ( - action$: Observable, - {}: Observable, - { db }: RaidenEpicDeps, -): Observable => - action$.pipe( - filter(isActionOf([transferSecret, transferSecretRegister.success])), - tap((action) => { - const doc = db.transfers.by('_id', transferKey(action.meta)); - if (!doc) return; - // unconfirmed transferSecretRegister is handled as transferSecret: - // acknowledge secret, but don't set it as registered yet - if (transferSecret.is(action) || action.payload.confirmed === undefined) { - if (doc.secret) return; // only update if secret is unset - } else { - // update if secret is unset or registerBlock needs update - if (doc.secret && doc.secretRegistered?.txBlock === action.payload.txBlock) return; - } - db.transfers.update({ - ...doc, - secret: action.payload.secret, - ...(transferSecret.is(action) || action.payload.confirmed === undefined - ? {} - : { secretRegistered: timed(pick(['txHash', 'txBlock'], action.payload)) }), - }); - }), - ignoreElements(), - ); - -const fieldMap = { - [transferSecretRequest.type]: 'secretRequest', - [transferSecretReveal.type]: 'secretReveal', - [transferProcessed.type]: 'transferProcessed', - [transferUnlockProcessed.type]: 'unlockProcessed', - [transferExpireProcessed.type]: 'expiredProcessed', -} as const; - -export const transferMessagesReducerEpic = ( - action$: Observable, - {}: Observable, - { db }: RaidenEpicDeps, -): Observable => - action$.pipe( - filter( - isActionOf([ - transferSecretRequest, - transferSecretReveal, - transferProcessed, - transferUnlockProcessed, - transferExpireProcessed, - ]), - ), - tap((action) => { - const field = fieldMap[action.type]; - const doc = db.transfers.by('_id', transferKey(action.meta)); - if (!doc || (!transferSecretRequest.is(action) && doc[field])) return; - db.transfers.update({ ...doc, [field]: timed(action.payload.message) }); - }), - ignoreElements(), - ); - -export const transferChannelClosedReducerEpic = ( - action$: Observable, - {}: Observable, - { db }: RaidenEpicDeps, -): Observable => - action$.pipe( - filter(isActionOf([channelClose.success, channelSettle.success])), - filter((action) => !!action.payload.confirmed), - tap((action) => { - const field = channelClose.success.is(action) ? 'channelClosed' : 'channelSettled'; - const results = db.transfers.find({ - channel: channelUniqueKey({ id: action.payload.id, ...action.meta }), - }); - for (const doc of results) { - db.transfers.update({ - ...doc, - [field]: timed(pick(['txHash', 'txBlock'], action.payload)), - }); - } - }), - ignoreElements(), - ); diff --git a/raiden-ts/src/transfers/epics/index.ts b/raiden-ts/src/transfers/epics/index.ts index 23b1e250dd..96206b53db 100644 --- a/raiden-ts/src/transfers/epics/index.ts +++ b/raiden-ts/src/transfers/epics/index.ts @@ -1,4 +1,3 @@ -export * from './db'; export * from './expire'; export * from './locked'; export * from './init'; diff --git a/raiden-ts/src/transfers/epics/locked.ts b/raiden-ts/src/transfers/epics/locked.ts index 47dc8bb61b..d3546f531a 100644 --- a/raiden-ts/src/transfers/epics/locked.ts +++ b/raiden-ts/src/transfers/epics/locked.ts @@ -203,8 +203,7 @@ function sendTransferSigned( return combineLatest([state$, deps.config$]).pipe( first(), mergeMap(([state, config]) => { - const doc = deps.db.transfers.by('_id', transferKey(action.meta)); - if (doc) { + if (deps.db.transfersKeys.has(transferKey(action.meta))) { // don't throw to avoid emitting transfer.failure, to just wait for already pending transfer deps.log.warn('transfer already present', action.meta); return EMPTY; @@ -419,19 +418,20 @@ function receiveTransferSigned( first(), mergeMap(([state, { revealTimeout, caps }]) => { const transfer: Signed = action.payload.message; - const doc = db.transfers.by('_id', transferKey(meta)); - if (doc) { - log.warn('transfer already present', action.meta); + if (db.transfersKeys.has(transferKey(meta))) { + log.warn('transfer already present', meta); const msgId = transfer.message_identifier; + const doc = db.transfers.by('_id', transferKey(meta)); // if transfer matches the stored one, re-send Processed once if ( - doc.transferProcessed && + doc?.transferProcessed && doc.partner === action.meta.address && msgId.eq(doc.transfer.message_identifier) ) { // transferProcessed again will trigger messageSend.request return of(transferProcessed({ message: untime(doc.transferProcessed!) }, meta)); - } else return EMPTY; + } + return EMPTY; } // full balance proof validation diff --git a/raiden-ts/src/transfers/persister.ts b/raiden-ts/src/transfers/persister.ts new file mode 100644 index 0000000000..9e480972a0 --- /dev/null +++ b/raiden-ts/src/transfers/persister.ts @@ -0,0 +1,167 @@ +import pick from 'lodash/fp/pick'; + +import type { RaidenPersister } from '../persister'; +import { channelKey, channelUniqueKey } from '../channels/utils'; +import { ChannelState } from '../channels/state'; +import { channelClose, channelSettle } from '../channels/actions'; +import { timed } from '../utils/types'; +import { isActionOf } from '../utils/actions'; +import { transferKey } from './utils'; +import { TransferState, Direction } from './state'; +import { + transferSigned, + transferUnlock, + transferExpire, + transferSecret, + transferSecretRegister, + transferSecretRequest, + transferSecretReveal, + transferProcessed, + transferUnlockProcessed, + transferExpireProcessed, +} from './actions'; + +const END = { [Direction.SENT]: 'own', [Direction.RECEIVED]: 'partner' } as const; + +function channelKeyFromTransfer( + transfer: TransferState | transferSigned | transferUnlock.success | transferExpire.success, +) { + let tokenNetwork, partner; + if ('type' in transfer) { + tokenNetwork = transfer.payload.message.token_network_address; + partner = transfer.payload.partner; + } else { + tokenNetwork = transfer.transfer.token_network_address; + partner = transfer.partner; + } + return channelKey({ tokenNetwork, partner }); +} + +const transferSignedPersister: RaidenPersister = (db, [state], action) => { + if (!transferSigned.is(action)) return; + const channel = state.channels[channelKeyFromTransfer(action)]; + // only proceed if channel is open and nonce is current + if ( + channel?.state !== ChannelState.open || + // here, action was already handled by reducer; if it fails, this is an old action, ignore + !channel[END[action.meta.direction]].balanceProof.nonce.eq(action.payload.message.nonce) + ) + return; + const _id = transferKey(action.meta); + db.transfersKeys.add(_id); + db.transfers.insert({ + _id, + channel: channelUniqueKey({ + id: action.payload.message.channel_identifier.toNumber(), + tokenNetwork: action.payload.message.token_network_address, + partner: action.payload.partner, + }), + ...action.meta, // direction, secrethash + expiration: action.payload.message.lock.expiration.toNumber(), + transfer: timed(action.payload.message), + partner: action.payload.partner, + fee: action.payload.fee, + }); +}; + +const transferUnlockExpirePersister: RaidenPersister = (db, [state], action) => { + if (!isActionOf([transferUnlock.success, transferExpire.success], action)) return; + const channel = state.channels[channelKeyFromTransfer(action)]; + // only proceed if channel is open and nonce is current + if ( + channel?.state !== ChannelState.open || + // here, action was already handled by reducer; if it fails, this is an old action, ignore + !channel[END[action.meta.direction]].balanceProof.nonce.eq(action.payload.message.nonce) + ) + return; + const field = transferUnlock.success.is(action) ? 'unlock' : 'expired'; + const doc = db.transfers.by('_id', transferKey(action.meta)); + if (!doc || doc[field]) return; + db.transfers.update({ ...doc, [field]: timed(action.payload.message) }); +}; + +const transferSecretPersister: RaidenPersister = (db, _, action) => { + if (!isActionOf([transferSecret, transferSecretRegister.success], action)) return; + const doc = db.transfers.by('_id', transferKey(action.meta)); + if (!doc) return; + // unconfirmed transferSecretRegister is handled as transferSecret: + // acknowledge secret, but don't set it as registered yet + if (transferSecret.is(action) || action.payload.confirmed === undefined) { + if (doc.secret) return; // only update if secret is unset + } else { + // update if secret is unset or registerBlock needs update + if (doc.secret && doc.secretRegistered?.txBlock === action.payload.txBlock) return; + } + db.transfers.update({ + ...doc, + secret: action.payload.secret, + ...(transferSecret.is(action) || action.payload.confirmed === undefined + ? {} + : { secretRegistered: timed(pick(['txHash', 'txBlock'], action.payload)) }), + }); +}; + +const fieldMap = { + [transferSecretRequest.type]: 'secretRequest', + [transferSecretReveal.type]: 'secretReveal', + [transferProcessed.type]: 'transferProcessed', + [transferUnlockProcessed.type]: 'unlockProcessed', + [transferExpireProcessed.type]: 'expiredProcessed', +} as const; + +const transferMessagesPersister: RaidenPersister = (db, _, action) => { + if ( + !isActionOf( + [ + transferSecretRequest, + transferSecretReveal, + transferProcessed, + transferUnlockProcessed, + transferExpireProcessed, + ], + action, + ) + ) + return; + const field = fieldMap[action.type]; + const doc = db.transfers.by('_id', transferKey(action.meta)); + if (!doc || (!transferSecretRequest.is(action) && doc[field])) return; + db.transfers.update({ ...doc, [field]: timed(action.payload.message) }); +}; + +const transferChannelClosedPersister: RaidenPersister = (db, _, action) => { + if (!isActionOf([channelClose.success, channelSettle.success], action)) return; + if (!action.payload.confirmed) return; + const field = channelClose.success.is(action) ? 'channelClosed' : 'channelSettled'; + const results = db.transfers.find({ + channel: channelUniqueKey({ id: action.payload.id, ...action.meta }), + }); + for (const doc of results) { + db.transfers.update({ + ...doc, + [field]: timed(pick(['txHash', 'txBlock'], action.payload)), + }); + } +}; + +/** + * Transfers root persister. + * Since transfers states aren't kept in the state, but instead only on the database, they are + * "reduced" and mutated synchronously by these persisters. The synchronous/in-memory database + * (LokiJS) then is responsible for syncing it with the storage database (PouchDB). + * + * @param db - RaidenDatabase + * @param states - new/prev states tuple + * @param action - Raiden action + */ +export const transfersPersister: RaidenPersister = (db, states, action) => { + for (const persister of [ + transferSignedPersister, + transferUnlockExpirePersister, + transferSecretPersister, + transferMessagesPersister, + transferChannelClosedPersister, + ]) { + persister(db, states, action); + } +}; diff --git a/raiden-ts/src/transfers/utils.ts b/raiden-ts/src/transfers/utils.ts index 7600655c80..d34776f4c5 100644 --- a/raiden-ts/src/transfers/utils.ts +++ b/raiden-ts/src/transfers/utils.ts @@ -140,7 +140,7 @@ export function raidenTransfer(state: TransferState): RaidenTransfer { state.unlockProcessed || state.expiredProcessed || state.secretRegistered || - state.channelClosed + state.channelSettled ); return { key: transferKey(state), diff --git a/raiden-ts/tests/unit/mocks.ts b/raiden-ts/tests/unit/mocks.ts index fbd2949595..46ef474a26 100644 --- a/raiden-ts/tests/unit/mocks.ts +++ b/raiden-ts/tests/unit/mocks.ts @@ -69,14 +69,14 @@ import { ShutdownReason, Capabilities } from 'raiden-ts/constants'; import { createEpicMiddleware } from 'redux-observable'; import { raidenRootEpic } from 'raiden-ts/epics'; import { migrateDatabase, putRaidenState, getRaidenState } from 'raiden-ts/db/utils'; -import { RaidenDatabaseConstructor } from 'raiden-ts/db/types'; +import { RaidenStorageConstructor } from 'raiden-ts/db/types'; import { getNetworkName } from 'raiden-ts/utils/ethers'; logging.setLevel(logging.levels.DEBUG); const RaidenPouchDB = PouchDB.defaults({ adapter: 'memory', log: logging, -} as any) as RaidenDatabaseConstructor; +} as any) as RaidenStorageConstructor; export type MockedTransaction = ContractTransaction & { wait: jest.MockedFunction;