Skip to content

Commit

Permalink
fix(action): fixed concurrency failing due to incorrect tasks reference
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewcourtice committed Dec 12, 2022
1 parent d1d43c4 commit cf73ef0
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 52 deletions.
18 changes: 1 addition & 17 deletions extensions/action/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1 @@
import {
INTERNAL,
} from '@harlem/core';

export const SENDER = 'extension:action';
export const STATE_PROP = `${INTERNAL.prefix}actions` as const;

export const MUTATIONS = {
init: 'extension:action:init',
register: 'extension:action:register',
incrementRunCount: 'extension:action:increment-run-count',
addInstance: 'extension:action:add-instance',
removeInstance: 'extension:action:remove-instance',
addError: 'extension:action:add-error',
clearErrors: 'extension:action:clear-errors',
resetState: 'extension:action:reset-state',
} as const;
export const SENDER = 'extension:action';
53 changes: 20 additions & 33 deletions extensions/action/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,49 +71,41 @@ export default function actionExtension<TState extends BaseState>(options?: Part
return (store: InternalStore<TState>) => {
store.register('extensions', 'action', () => rootOptions);

const actionTasks = new Map<string, Set<Task<unknown>>>();
const actionState = reactive(new Map<string, ActionState>());

function setActionState<TPayload = unknown, TResult = unknown>(name: string) {
function setActionState<TPayload = unknown>(name: string) {
const state = {
runCount: 0,
tasks: new Set(),
instances: new Map(),
errors: new Map(),
} as ActionState<TPayload, TResult>;
} as ActionState<TPayload>;

actionState.set(name, state);

return state;
}

function getActionState<TPayload = unknown, TResult = unknown>(name: string) {
return (actionState.get(name) || setActionState(name)) as ActionState<TPayload, TResult>;
}

function updateActionState(name: string, producer: (currentState: ActionState) => Partial<ActionState>) {
const currentState = actionState.get(name);

if (!currentState) {
return;
}

const newState = producer(currentState);

if (newState !== currentState) {
actionState.set(name, {
...currentState,
...newState,
});
}
function getActionState<TPayload = unknown>(name: string) {
return (actionState.get(name) || setActionState(name)) as ActionState<TPayload>;
}

function registerAction(name: string, options: Partial<ActionOptions<any>> = {}) {
store.register('actions', name, () => options);
return setActionState(name);
setActionState(name);

const tasks = new Set<Task<unknown>>();
actionTasks.set(name, tasks);

return {
tasks,
};
}

function action<TPayload, TResult = void>(name: string, body: ActionBody<TState, TPayload, TResult>, options?: Partial<ActionOptions<TPayload>>): Action<TPayload, TResult> {
registerAction(name, options);
const {
tasks,
} = registerAction(name, options);

const {
concurrent,
Expand All @@ -132,16 +124,13 @@ export default function actionExtension<TState extends BaseState>(options?: Part
} as ActionOptions<TPayload>;

const mutate = (mutator: Mutator<TState, undefined, void>) => store.write(name, SENDER, mutator);
const incrementRunCount = () => updateActionState(name, ({ runCount }) => ({
runCount: runCount + 1,
}));
const incrementRunCount = () => getActionState(name).runCount += 1;

return ((payload: TPayload, controller?: AbortController) => {
const {
tasks,
instances,
errors,
} = getActionState<TPayload, TResult>(name);
} = getActionState<TPayload>(name);

if (!concurrent || (typeIsFunction(concurrent) && !concurrent(payload, Array.from(instances.values())))) {
abortAction(name, 'New instance started on non-concurrent action');
Expand Down Expand Up @@ -264,11 +253,9 @@ export default function actionExtension<TState extends BaseState>(options?: Part
([] as string[])
.concat(name)
.forEach(name => {
const {
tasks,
} = getActionState(name);
const tasks = actionTasks.get(name);

if (tasks.size) {
if (tasks?.size) {
tasks.forEach(task => {
task.abort(reason);
tasks.delete(task);
Expand Down
3 changes: 1 addition & 2 deletions extensions/action/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ export interface ActionAbortStrategies {
warn: ActionAbortStrategy;
}

export interface ActionState<TPayload = unknown, TResult = unknown> {
export interface ActionState<TPayload = unknown> {
runCount: number;
tasks: Set<Task<TResult>>;
instances: Map<symbol, TPayload>;
errors: Map<symbol, unknown>;
}
Expand Down

0 comments on commit cf73ef0

Please sign in to comment.