Skip to content

Commit

Permalink
feat(demux): implement new Updater and enhance Aloxide handler to han…
Browse files Browse the repository at this point in the history
…dle custom actions (lecle#99)
  • Loading branch information
cymonkey committed Nov 16, 2020
1 parent c3b4f5e commit 00593d2
Show file tree
Hide file tree
Showing 8 changed files with 760 additions and 108 deletions.
106 changes: 104 additions & 2 deletions packages/demux/src/AloxideActionHandler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { EntityConfig } from '@aloxide/bridge/src';
import { AbstractActionHandler } from 'demux';
import { EntityConfig } from '@aloxide/bridge';
import { AbstractActionHandler, BlockInfo, Updater, VersionedAction } from 'demux';

import { indexStateSchema } from './indexStateSchema';

Expand All @@ -13,9 +13,14 @@ import type {
import type { DataAdapter } from './DataAdapter';
import type { DMeta } from './DMeta';
import type { IndexStateModel } from './IndexStateModel';
import { VersatileUpdater } from './VersatileUpdater';

export interface AloxideActionHandlerOptions extends ActionHandlerOptions {
indexStateModelName?: string;
handlers?: {
actionName: string;
handler: (data: { state: any; payload: any; blockInfo: BlockInfo; context: any }) => void;
}[];
}

export interface AloxideActionHandlerContext {
Expand All @@ -37,13 +42,110 @@ export class AloxideActionHandler extends AbstractActionHandler {
super(handlerVersions, options);
if (options) {
this.indexStateModelName = options.indexStateModelName;

// add initial handlers
const handlers = Array.isArray(options.handlers) ? options.handlers : [];

for (const { handler, actionName } of handlers) {
this.addHandler(handler, actionName);
}
}
}

getIndexStateModelName() {
return this.indexStateModelName || `DemuxIndexState_${this.bcName.replace(/\W+/, '_')}`;
}

/**
* @override
* @param candidateType The incoming action's type
* @param subscribedType The type the Updater of Effect is subscribed to
* @param _payload The payload of the incoming Action.
*/
matchActionType(candidateType, subscribedType, _payload?): boolean {
if (subscribedType === '*') {
return true;
}

return candidateType === subscribedType;
}

/**
* @override
* @param state
* @param nextBlock
* @param context
* @param isReplay
*/
applyUpdaters(
state: any,
nextBlock: NextBlock,
context: any,
isReplay: boolean,
): Promise<VersionedAction[]> {
// Add additional data to payload for further handling.
nextBlock.block.actions.forEach(action => {
action.payload.actionType = action.type;
});

return super.applyUpdaters(state, nextBlock, context, isReplay);
}

/**
* Add updater to handle data
* @param updater Updater
*/
addUpdater(updater: Updater) {
// @ts-ignore
if (this.handlerVersionMap) {
// @ts-ignore
const updaters = this.handlerVersionMap[this.handlerVersionName].updaters;
updaters.push(updater);
} else {
throw new Error('"handlerVersionMap" not found');
}
}

/**
* Add custom handler for custom action
* @param handler hanlder function
* @param actionName action name string
*/
addHandler(
handler: (data: { state: any; payload: any; blockInfo: BlockInfo; context: any }) => void,
actionName?: string,
) {
const versatileUpdaters: VersatileUpdater[] = this.getVersatileUpdaters();

if (versatileUpdaters.length === 0) {
this.log.warn(
'"addHandler" is intended to use only with Versatile Updaters which can handle all types of actions (actionType = "*")',
);
return;
}

for (const updater of versatileUpdaters) {
updater.addHandler(handler, actionName);
}

return true;
}

private getVersatileUpdaters(): VersatileUpdater[] {
let updaters: any[] = [];
// @ts-ignore
if (this.handlerVersionMap) {
// @ts-ignore
updaters = this.handlerVersionMap[this.handlerVersionName].updaters;
}

return updaters.filter(
updater =>
(updater instanceof VersatileUpdater || typeof updater.addHandler === 'function') &&
updater.actionType === '*',
);
}

protected updateIndexState(
state: any,
nextBlock: NextBlock,
Expand Down
86 changes: 86 additions & 0 deletions packages/demux/src/VersatileUpdater.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import type { ActionCallback, BlockInfo, Updater } from 'demux';
import type { Logger } from './Logger';

export class VersatileUpdater implements Updater {
#_handlersMap: Map<string, symbol[]> = new Map();
actionType: string = '*';
logger?: Logger;

constructor(options: { logger?: Logger; actionType?: string } = {}) {
if (typeof options.actionType === 'string') {
this.actionType = options.actionType;
}
this.logger = options.logger;
}

apply: ActionCallback = (
state: any,
payload: any,
blockInfo: BlockInfo,
context: any,
): Promise<void> => {
const actionName = payload.actionType;

this.handleData(actionName, {
state,
payload,
blockInfo,
context,
});
return Promise.resolve();
};

addHandler(
handler: (data: { state: any; payload: any; blockInfo: BlockInfo; context: any }) => void,
actionName?: string,
) {
if (typeof handler !== 'function') {
throw new Error('"handler" is required and must be a function');
}

if (!actionName) {
actionName = this.actionType;
}

// TODO: enhance this simple check by using regex.
if (typeof actionName !== 'string' || actionName.indexOf('::') < 1) {
throw new Error(
`"actionName" must be a string and must contain account which this action belong to. Ex: "eosio::${actionName}"`,
);
}

if (this.actionType !== '*' && this.actionType !== actionName) {
throw new Error(`This Updater is used to handle "${this.actionType}" action only`);
}

const handlerMap = this.#_handlersMap;
const newSymbol = Symbol(actionName);
this[newSymbol] = handler;

if (handlerMap.has(actionName)) {
handlerMap.get(actionName).push(newSymbol);
} else {
handlerMap.set(actionName, [newSymbol]);
}

return true;
}

protected async handleData(actionName: string, data: any, scope?: any): Promise<any> {
const handlerMap = this.#_handlersMap;
const handlerSymbols: symbol[] = handlerMap.get(actionName) || [];

if (handlerSymbols.length === 0) return;

// Pass custom scope to prevent suspicious handler from modifying real object by .
scope = scope ? scope : {};
const handlerCalls = [];

// Execute all handlers
for (const symbol of handlerSymbols) {
handlerCalls.push(this[symbol].call(scope, data));
}

return Promise.all(handlerCalls);
}
}
9 changes: 7 additions & 2 deletions packages/demux/src/createWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type { ActionReader, ActionHandler, HandlerVersion, ActionWatcherOptions
import type { AloxideActionHandlerOptions } from './AloxideActionHandler';
import type { Logger } from './Logger';
import type { AloxideConfig } from '@aloxide/abstraction';
import { VersatileUpdater } from './VersatileUpdater';

export interface CreateWatcherConfig {
/**
Expand All @@ -27,7 +28,7 @@ export interface CreateWatcherConfig {
actionWatcherOptions?: ActionWatcherOptions;
}

export async function createWatcher(config: CreateWatcherConfig): Promise<BaseActionWatcher> {
export function createWatcher(config: CreateWatcherConfig): BaseActionWatcher {
const {
bcName,
accountName,
Expand All @@ -47,7 +48,11 @@ export async function createWatcher(config: CreateWatcherConfig): Promise<BaseAc
handlerVersions = [
new BaseHandlerVersion(
versionName,
createDbUpdater(accountName, dataAdapter, aloxideConfig.entities, logger),
[
...createDbUpdater(accountName, dataAdapter, aloxideConfig.entities, logger),
// Versatile Updater is used to handle all actions by default
new VersatileUpdater(),
],
[],
),
];
Expand Down
Loading

0 comments on commit 00593d2

Please sign in to comment.