Skip to content

Commit

Permalink
sdk: replace reducer epics with persister middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
andrevmatos committed Sep 5, 2020
1 parent e799c30 commit 66aea7f
Show file tree
Hide file tree
Showing 13 changed files with 292 additions and 252 deletions.
2 changes: 1 addition & 1 deletion raiden-ts/src/db/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ let defaultPouchAdapter: string;
* @param descending - Wether to swap start & endkey for reverse reverse search
* @returns allDocs's options to fetch all documents which keys start with prefix
*/
function byPrefix(prefix: string, descending = false) {
export function byPrefix(prefix: string, descending = false) {
const start = prefix;
const end = prefix + '\ufff0';
return !descending
Expand Down
43 changes: 1 addition & 42 deletions raiden-ts/src/db/epics.ts
Original file line number Diff line number Diff line change
@@ -1,50 +1,9 @@
import { Observable } from 'rxjs';
import { distinctUntilChanged, ignoreElements, pairwise, finalize, tap } from 'rxjs/operators';
import { ignoreElements, finalize } from 'rxjs/operators';

import { RaidenAction } from '../actions';
import { RaidenState } from '../state';
import { RaidenEpicDeps } from '../types';
import { upsert } from './utils';

/**
* Update state based on actions and state changes
*
* For certain common actions with trivial reduced state side-effects, it may try to produce an
* optimized update command. Otherwise, the whole state will be upserted.
*
* @param action$ - Observable of RaidenActions
* @param state$ - Observable of RaidenStates
* @param deps - Epics dependencies
* @param deps.db - Database instance
* @returns observable to persist state changes to db
*/
export const dbStateEpic = (
{}: Observable<RaidenAction>,
state$: Observable<RaidenState>,
{ db }: RaidenEpicDeps,
): Observable<never> =>
state$.pipe(
distinctUntilChanged(),
pairwise(),
tap(([prev, cur]) => {
for (const k in cur) {
const key = k as keyof RaidenState;
// key has same value, pass over
if (cur[key] === prev[key]) continue;
else if (key === 'channels' || key === 'oldChannels') {
// iterate over channels separately
for (const id in cur[key]) {
if (cur[key][id] === prev[key][id]) continue;
upsert(db.channels, cur[key][id]);
}
} else upsert(db.state, { _id: key, value: cur[key] });
// notice we don't handle deleted values: the set of top-level keys are constant,
// oldChannels aren't deleted, and current channels are only moved to oldChannels,
// which share the [channelUniqueKey], so they get replaced
}
}),
ignoreElements(),
);

/**
* Shutdown database instance when raiden shuts down
Expand Down
1 change: 1 addition & 0 deletions raiden-ts/src/db/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,5 @@ export interface RaidenDatabase {
state: Loki.Collection<StateMember>;
channels: Loki.Collection<Channel>;
transfers: Loki.Collection<TransferState>;
transfersKeys: Set<string>;
}
23 changes: 16 additions & 7 deletions raiden-ts/src/db/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { Channel } from '../channels';
import { channelKey, channelUniqueKey } from '../channels/utils';
import { last, Address } from '../utils/types';

import { TransferState } from '../transfers/state';
import { Direction, TransferState } from '../transfers/state';
import {
RaidenStorage,
Migrations,
Expand All @@ -26,7 +26,7 @@ import {
RaidenDatabase,
StateMember,
} from './types';
import { getDefaultPouchAdapter, LokiRaidenAdapter } from './adapter';
import { byPrefix, getDefaultPouchAdapter, LokiRaidenAdapter } from './adapter';
import defaultMigrations from './migrations';

/**
Expand Down Expand Up @@ -110,7 +110,17 @@ async function makeDatabase(storage: RaidenStorage): Promise<RaidenDatabase> {
});
});

return { storage, db, state, channels, transfers };
// populate transfersKeys with all keys from database, since loki only keep track of a subset
// of the transfers
const transfersKeys = new Set<string>();
const [sentTransfers, receivedTransfers] = await Promise.all([
storage.allDocs(byPrefix(`${Direction.SENT}:`)),
storage.allDocs(byPrefix(`${Direction.RECEIVED}:`)),
]);
sentTransfers.rows.forEach(({ id }) => transfersKeys.add(id));
receivedTransfers.rows.forEach(({ id }) => transfersKeys.add(id));

return { storage, db, state, channels, transfers, transfersKeys };
}

/**
Expand Down Expand Up @@ -180,7 +190,7 @@ export function get$<T extends { _id: string } = { _id: string }>(
* @param db - Database to query state from
* @returns mapping object potentially decodable to RaidenState
*/
export async function getRaidenState(db: RaidenDatabase): Promise<any | undefined> {
export function getRaidenState(db: RaidenDatabase): any | undefined {
const state = { channels: {}, oldChannels: {} } as any;

for (const { _id, value } of db.state.find()) {
Expand All @@ -198,13 +208,12 @@ export async function getRaidenState(db: RaidenDatabase): Promise<any | undefine
* Like [[dbStateEpic]], stores each key of RaidenState as independent value on the database,
* prefixed * with 'state.', to make it cheaper to save changes touching only a subset of the state.
* 'channels' (being a special hotpath of complex objects) are split as one entry per channel.
* Used to store initial state
* Used to store initial state (on empty db)
*
* @param db - Database to store state into
* @param state - State to persist
* @returns Promise to void
*/
export async function putRaidenState(db: RaidenDatabase, state: RaidenState): Promise<void> {
export function putRaidenState(db: RaidenDatabase, state: RaidenState): void {
for (const [key, value] of Object.entries(state)) {
if (key === 'channels' || key === 'oldChannels') {
for (const channel of Object.values<Channel>(value)) {
Expand Down
4 changes: 2 additions & 2 deletions raiden-ts/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -477,10 +477,10 @@ export async function getState(
db = await migrateDatabase.call(dbCtor, dbName);
}

let state = await getRaidenState(db);
let state = getRaidenState(db);
if (!state) {
state = makeInitialState({ network, address, contractsInfo: contractsInfo });
await putRaidenState(db, state);
putRaidenState(db, state);
} else {
state = decode(RaidenState, state);
}
Expand Down
91 changes: 91 additions & 0 deletions raiden-ts/src/persister.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/**
* This file introduces a 'persister' middleware for redux
*
* It's coupled with RaidenDatabase, and runs a root _reducer-like_ function for each action/state
* change going through redux state machine.
* The function receives the RaidenDatabase instance, a tuple containing the current and previous
* state, and the action which triggered this change. Like reducers, the function **must not**
* change state, but in this case, return value is also ignored. Instead, it should do whatever
* logic it needs to persist the new state on the database. Redux-state changes should still be
* performed on reducers, as usual.
* This is useful as a reducer-like synchronous function for members of the state which aren't
* kept in the state, but on the database instead, or to sync/persist state changes to the
* database storage.
*/

import { Dispatch, Middleware } from 'redux';
import type { RaidenState } from './state';
import type { RaidenAction } from './actions';
import type { RaidenDatabase } from './db/types';
import { upsert } from './db/utils';

import { transfersPersister } from './transfers/persister';

/**
* Persister function type: receives RaidenDatabase, new/prev state tuple, and action
*/
export type RaidenPersister = (
db: RaidenDatabase,
states: readonly [currentState: RaidenState, prevState: RaidenState],
action: RaidenAction,
) => void;

/**
* Raiden root persister: called by the persister middleware. Should call sub-persisters if needed
* Persisters should be synchronous, since they block the state machine.
*
* @param db - RaidenDatabase to persist/sync state on
* @param states - Pair of new/prev RaidenStates
* @param action - Action which got dispatched through redux
*/
export const raidenRootPersister: RaidenPersister = (db, states, action) => {
const [state, prev] = states;
if (state !== prev) {
for (const k in state) {
const key = k as keyof RaidenState;
// key has same value, pass over
if (state[key] === prev[key]) continue;
else if (key === 'channels' || key === 'oldChannels') {
// iterate over channels separately
for (const id in state[key]) {
if (state[key][id] === prev[key][id]) continue;
upsert(db.channels, state[key][id]);
}
} else upsert(db.state, { _id: key, value: state[key] });
// notice we don't handle deleted values: the set of top-level keys are constant,
// oldChannels aren't deleted, and current channels are only moved to oldChannels,
// which share the [channelUniqueKey], so they get replaced
}
}

// sub persisters
for (const persister of [transfersPersister]) persister(db, states, action);
};

/**
* Create a raiden persister middleware for redux.
* It calls a given root persister every time an action goes through Redux state machine, passing
* the db RaidenDatabase, new and previous states, and the action.
* The persister gets called after the action reduced the state.
*
* @param db - Raiden Database object, passed to persister
* @param persister - Root persister to run
* @returns Middleware function to be applied to redux
*/
export function createPersisterMiddleware(
db: RaidenDatabase,
persister: RaidenPersister = raidenRootPersister,
): Middleware<undefined, RaidenState, Dispatch<RaidenAction>> {
const log = db.storage.constructor.__defaults.log;
return (store) => (next) => (action) => {
const prevState = store.getState();
const result = next(action);
const state = store.getState();
try {
persister(db, [state, prevState], action);
} catch (err) {
log?.warn?.('Persister error', err);
}
return result;
};
}
4 changes: 3 additions & 1 deletion raiden-ts/src/raiden.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ import {
import { RaidenError, ErrorCodes } from './utils/error';
import { RaidenDatabase } from './db/types';
import { get$, dumpDatabaseToArray } from './db/utils';
import { createPersisterMiddleware } from './persister';

export class Raiden {
private readonly store: Store<RaidenState, RaidenAction>;
Expand Down Expand Up @@ -274,12 +275,13 @@ export class Raiden {
RaidenState,
RaidenEpicDeps
>({ dependencies: this.deps });
const persisterMiddleware = createPersisterMiddleware(db);

this.store = createStore(
raidenReducer,
// workaround for redux's PreloadedState issues with branded values
state as any, // eslint-disable-line @typescript-eslint/no-explicit-any
applyMiddleware(loggerMiddleware, this.epicMiddleware),
applyMiddleware(loggerMiddleware, this.epicMiddleware, persisterMiddleware),
);

// populate deps.latest$, to ensure config, logger && pollingInterval are setup before start
Expand Down
Loading

0 comments on commit 66aea7f

Please sign in to comment.