Skip to content

Commit

Permalink
refactor queue logic
Browse files Browse the repository at this point in the history
  • Loading branch information
madeindjs committed Aug 28, 2024
1 parent 5da4299 commit 1b2af65
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
import {
ComponentPublicInstance,
computed,
readonly,
Ref,
ref,
ShallowRef,
} from "vue";
import { ComponentPublicInstance, computed, Ref, ShallowRef } from "vue";
import { type internal } from "arquero";
import { Core, InstancePath } from "../../../writerTypes";
import { ARQUERO_INTERNAL_ID } from "./constants";
import { useJobs } from "./useJobs";

/**
* Encapsulates the logic to update an Arquero table and sync it with the backend
Expand All @@ -19,17 +13,33 @@ export function useDataFrameValueBroker(
emitterEl: Ref<HTMLElement | ComponentPublicInstance>,
table: ShallowRef<internal.ColumnTable | null>,
) {
const isBusy = ref(false);
const queuedEvent: Ref<{
columnName: string;
rowIndex: number;
value: unknown;
}> = ref(null);

const componentId = instancePath.at(-1).componentId;
const component = computed(() => wf.getComponentById(componentId));

async function handleAddRow(rowIndex: number, direction: -1 | 1) {
type Job =
| {
eventType: "wf-dataframe-add";
payload: Parameters<typeof handlerAddRow>;
}
| {
eventType: "wf-dataframe-update";
payload: Parameters<typeof handlerUpdateCell>;
};

const { push: pushJob, isBusy } = useJobs<Job>(handler);

async function handler(job: Job) {
switch (job.eventType) {
case "wf-dataframe-add":
await handlerAddRow(...job.payload);
break;
case "wf-dataframe-update":
await handlerUpdateCell(...job.payload);
break;
}
}

async function handlerAddRow(rowIndex: number, direction: -1 | 1) {
const eventType = "wf-dataframe-add";
const rowIndexBackend = rowIndex - 1; // 0-based index (arquero is based on 1-based index)

Expand Down Expand Up @@ -75,11 +85,7 @@ export function useDataFrameValueBroker(
});
}

/**
* Takes a value and emits a CustomEvent of the given type.
* Deals with debouncing.
*/
async function handleUpdateCell(
async function handlerUpdateCell(
columnName: string,
rowIndex: number,
value: unknown,
Expand Down Expand Up @@ -115,60 +121,47 @@ export function useDataFrameValueBroker(

if (!isEventUsed(eventType)) return;

if (isBusy.value) {
// Queued event is overwritten for debouncing purposes
queuedEvent.value = { columnName, rowIndex, value };
return;
}

isBusy.value = true;
const callback = () => {
isBusy.value = false;
if (queuedEvent.value) {
handleUpdateCell(
queuedEvent.value.columnName,
queuedEvent.value.rowIndex,
queuedEvent.value.value,
);
queuedEvent.value = null;
}
};

const event = new CustomEvent(eventType, {
detail: {
payload: {
record,
record_index: rowIndexBackend,
return new Promise((res) => {
const event = new CustomEvent(eventType, {
detail: {
payload: { record, record_index: rowIndexBackend },
callback: res,
},
callback,
},
});
});

dispatchEvent(event);
dispatchEvent(event);
});
}

function isEventUsed(eventType: string): boolean {
const isHandlerSet = component.value.handlers?.[eventType];
const isBindingSet = component.value.binding?.eventType == eventType;

// Event is not used
if (!isHandlerSet && !isBindingSet) return false;
return true;
return Boolean(isHandlerSet || isBindingSet);
}

function dispatchEvent(event: CustomEvent) {
if (emitterEl.value instanceof HTMLElement) {
emitterEl.value.dispatchEvent(event);
} else {
// Vue instance (ComponentPublicInstance)
emitterEl.value.$el.dispatchEvent(event);
emitterEl.value.$el.dispatchEvent(event); // Vue instance (ComponentPublicInstance)
}
}

return {
handleUpdateCell,
handleAddRow,
isBusy: readonly(isBusy),
isBusy,

handleUpdateCell: (...args: Parameters<typeof handlerUpdateCell>) =>
pushJob({
eventType: "wf-dataframe-update",
payload: args,
}),

handleAddRow: (...args: Parameters<typeof handlerAddRow>) =>
pushJob({
eventType: "wf-dataframe-add",
payload: args,
}),
};
}

Expand Down
37 changes: 37 additions & 0 deletions src/ui/src/core_components/content/CoreDataframe/useJobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { shallowRef, ref, readonly } from "vue";

/**
* A simple FIFO Job queue algorithm
*/
export function useJobs<T>(handler: (value: T) => Promise<void>) {
const jobs = shallowRef<T[]>([]);
const isRunning = ref(false);

async function run() {
if (isRunning.value) return;

isRunning.value = true;

while (jobs.value.length) {
const [job, ...rest] = jobs.value;

try {
await handler(job);
} catch (error) {
// eslint-disable-next-line no-console
console.error("Error during handling job", job);
} finally {
jobs.value = rest;
}
}

isRunning.value = false;
}

function push(job: T) {
jobs.value = [...jobs.value, job];
if (!isRunning.value) return run();
}

return { push, isBusy: readonly(isRunning) };
}

0 comments on commit 1b2af65

Please sign in to comment.