Skip to content

Commit

Permalink
Schema load sequence (finos#955)
Browse files Browse the repository at this point in the history
* cache table meta requests with promise to avoid multiple server requests

* ensure table meta is loaded before client is notified of subcribe

table meta for all tables was being requested from server every
time any hook asked for the table data. This was happening twice
at startup. Now we cache the metadata.
If we didn't have table meta at point where CREATE_VP_SUCCESS was
handled, we notified client anyway. This caused tables to be rendered
with no type information, so alignment was wrong on numerics.

* fix all data tests
  • Loading branch information
heswell authored Nov 12, 2023
1 parent 3f5b47d commit 8e9ffff
Show file tree
Hide file tree
Showing 9 changed files with 911 additions and 777 deletions.
4 changes: 1 addition & 3 deletions vuu-ui/packages/vuu-data-react/src/hooks/useVuuTables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ export const useVuuTables = () => {
const { tables } = await server.getTableList();
const tableSchemas = buildTables(
await Promise.all(
tables.map((tableDescriptor) =>
server.getTableSchema(tableDescriptor)
)
tables.map((vuuTable) => server.getTableSchema(vuuTable))
)
);
setTables(tableSchemas);
Expand Down
2 changes: 1 addition & 1 deletion vuu-ui/packages/vuu-data/src/connection-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ const connectedServerAPI: ServerAPI = {
) => asyncRequest<T>(message),

getTableList: async () =>
asyncRequest<VuuTableList>({ type: Message.GET_TABLE_LIST }),
asyncRequest<VuuTableList>({ type: "GET_TABLE_LIST" }),

getTableSchema: async (table) =>
asyncRequest<TableSchema>({
Expand Down
5 changes: 0 additions & 5 deletions vuu-ui/packages/vuu-data/src/server-proxy/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ export const CLOSE_TREE_SUCCESS = "CLOSE_TREE_SUCCESS";
export const CLOSE_TREE_REJECT = "CLOSE_TREE_REJECT";
export const CREATE_VISUAL_LINK = "CREATE_VISUAL_LINK";
export const CREATE_VP = "CREATE_VP";
export const CREATE_VP_SUCCESS = "CREATE_VP_SUCCESS";
export const DISABLE_VP = "DISABLE_VP";
export const DISABLE_VP_SUCCESS = "DISABLE_VP_SUCCESS";
export const DISABLE_VP_REJECT = "DISABLE_VP_REJECT";
export const ENABLE_VP = "ENABLE_VP";
export const ENABLE_VP_SUCCESS = "ENABLE_VP_SUCCESS";
export const ENABLE_VP_REJECT = "ENABLE_VP_REJECT";
export const GET_TABLE_LIST = "GET_TABLE_LIST";
export const GET_TABLE_META = "GET_TABLE_META";
export const GET_VP_VISUAL_LINKS = "GET_VP_VISUAL_LINKS";
export const GET_VIEW_PORT_MENUS = "GET_VIEW_PORT_MENUS";
Expand All @@ -25,7 +23,6 @@ export const VIEW_PORT_MENU_REJ = "VIEW_PORT_MENU_REJ";
export const HB = "HB";
export const HB_RESP = "HB_RESP";
export const LOGIN = "LOGIN";
export const LOGIN_SUCCESS = "LOGIN_SUCCESS";
export const OPEN_TREE_NODE = "OPEN_TREE_NODE";
export const OPEN_TREE_SUCCESS = "OPEN_TREE_SUCCESS";
export const OPEN_TREE_REJECT = "OPEN_TREE_REJECT";
Expand All @@ -36,8 +33,6 @@ export const RPC_RESP = "RPC_RESP";
export const MENU_RPC_RESP = "MENU_RPC_RESP";
export const SET_SELECTION = "SET_SELECTION";
export const SET_SELECTION_SUCCESS = "SET_SELECTION_SUCCESS";
export const TABLE_META_RESP = "TABLE_META_RESP";
export const TABLE_LIST_RESP = "TABLE_LIST_RESP";

export const TABLE_ROW = "TABLE_ROW";
export const SIZE = "SIZE";
Expand Down
261 changes: 125 additions & 136 deletions vuu-ui/packages/vuu-data/src/server-proxy/server-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ import {
ClientToServerMenuRPC,
ClientToServerMessage,
LinkDescriptorWithLabel,
ServerToClientCreateViewPortSuccess,
ServerToClientMessage,
ServerToClientTableList,
ServerToClientTableMeta,
VuuLinkDescriptor,
VuuRpcRequest,
Expand Down Expand Up @@ -33,8 +35,6 @@ import {
OpenDialogAction,
ServerProxySubscribeMessage,
VuuUIMessageIn,
VuuUIMessageInTableList,
VuuUIMessageInTableMeta,
VuuUIMessageOut,
VuuUIMessageOutAggregate,
VuuUIMessageOutCloseTreeNode,
Expand Down Expand Up @@ -125,11 +125,15 @@ export class ServerProxy {
private authToken = "";
private user = "user";
private pendingLogin?: PendingLogin;
private pendingTableMetaRequests = new Map<string, string>();
private pendingRequests = new Map<string, PendingRequest>();
private sessionId?: string;
private queuedRequests: Array<ClientToServerMessage["body"]> = [];
private cachedTableSchemas: Map<string, Readonly<TableSchema>> = new Map();
private cachedTableMetaRequests: Map<
string,
Promise<ServerToClientTableMeta>
> = new Map();
private cachedTableSchemas: Map<string, TableSchema> = new Map();
private tableList: Promise<ServerToClientTableList> | undefined;

constructor(connection: Connection, callback: PostMessageToClientCallback) {
this.connection = connection;
Expand Down Expand Up @@ -188,32 +192,84 @@ export class ServerProxy {
public subscribe(message: ServerProxySubscribeMessage) {
// guard against subscribe message when a viewport is already subscribed
if (!this.mapClientToServerViewport.has(message.viewport)) {
if (
!this.hasSchemaForTable(message.table) &&
// A Session table is never cached - it is limited to a single workflow interaction
// The metadata for a session table is requested even before the subscribe call.
!isSessionTable(message.table)
) {
info?.(
`subscribe to ${message.table.table}, no metadata yet, request metadata`
);
const requestId = nextRequestId();
this.sendMessageToServer(
{ type: "GET_TABLE_META", table: message.table },
requestId
);
this.pendingTableMetaRequests.set(requestId, message.viewport);
}
const pendingTableSchema = this.getTableMeta(message.table);
// if (
const viewport = new Viewport(message, this.postMessageToClient);
this.viewports.set(message.viewport, viewport);
// use client side viewport id as request id, so that when we process the response,
// Use client side viewport id as request id, so that when we process the response,
// which will provide the serverside viewport id, we can establish a mapping between
// the two
this.sendIfReady(
//TODO handle CREATE_VP error, but server does not send it at the moment
const pendingSubscription = this.awaitResponseToMessage(
viewport.subscribe(),
message.viewport,
this.sessionId !== ""
message.viewport
);
const awaitPendingReponses = Promise.all([
pendingSubscription,
pendingTableSchema,
]) as Promise<
[ServerToClientCreateViewPortSuccess, TableSchema | undefined]
>;
awaitPendingReponses.then(([subscribeResponse, tableSchema]) => {
const { viewPortId: serverViewportId } = subscribeResponse;
const { status: viewportStatus } = viewport;

// switch storage key from client viewportId to server viewportId
if (message.viewport !== serverViewportId) {
this.viewports.delete(message.viewport);
this.viewports.set(serverViewportId, viewport);
}
this.mapClientToServerViewport.set(message.viewport, serverViewportId);

const clientResponse = viewport.handleSubscribed(
subscribeResponse,
tableSchema
);
if (clientResponse) {
this.postMessageToClient(clientResponse);
if (debugEnabled) {
debug(
`post DataSourceSubscribedMessage to client: ${JSON.stringify(
clientResponse
)}`
);
}
}

// In the case of a reconnect, we may have resubscribed a disabled viewport,
// reset the disabled state on server
if (viewport.disabled) {
this.disableViewport(viewport);
}

if (
viewportStatus === "subscribing" &&
// A session table will never have Visual Links, nor Context Menus
!isSessionTable(viewport.table)
) {
// If status is "resubscribing", the following is unnecessary
this.sendMessageToServer({
type: Message.GET_VP_VISUAL_LINKS,
vpId: serverViewportId,
});
this.sendMessageToServer({
type: Message.GET_VIEW_PORT_MENUS,
vpId: serverViewportId,
});

// Resend requests for links from other viewports already on page, they may be linkable to this viewport
Array.from(this.viewports.entries())
.filter(
([id, { disabled }]) => id !== serverViewportId && !disabled
)
.forEach(([vpId]) => {
this.sendMessageToServer({
type: Message.GET_VP_VISUAL_LINKS,
vpId,
});
});
}
});
} else {
error(`spurious subscribe call ${message.viewport}`);
}
Expand Down Expand Up @@ -558,13 +614,33 @@ export class ServerProxy {
} else {
const { type, requestId } = message;
switch (type) {
case "GET_TABLE_LIST":
return this.sendMessageToServer({ type }, requestId);
case "GET_TABLE_META":
return this.sendMessageToServer(
{ type, table: message.table },
case "GET_TABLE_LIST": {
this.tableList ??= this.awaitResponseToMessage(
{ type },
requestId
);
) as Promise<ServerToClientTableList>;
this.tableList.then((response) => {
this.postMessageToClient({
type: "TABLE_LIST_RESP",
tables: response.tables,
requestId,
});
});
return;
}

case "GET_TABLE_META": {
this.getTableMeta(message.table, requestId).then((tableSchema) => {
if (tableSchema) {
this.postMessageToClient({
type: "TABLE_META_RESP",
tableSchema,
requestId,
});
}
});
return;
}
case "RPC_CALL":
return this.rpcCall(message);
default:
Expand All @@ -577,11 +653,27 @@ export class ServerProxy {
);
}

private getTableMeta(table: VuuTable, requestId = nextRequestId()) {
if (isSessionTable(table)) {
return Promise.resolve(undefined);
}
const key = `${table.module}:${table.table}`;
let tableMetaRequest = this.cachedTableMetaRequests.get(key);
if (!tableMetaRequest) {
tableMetaRequest = this.awaitResponseToMessage(
{ type: "GET_TABLE_META", table },
requestId
) as Promise<ServerToClientTableMeta>;
this.cachedTableMetaRequests.set(key, tableMetaRequest);
}
return tableMetaRequest?.then((response) => this.cacheTableMeta(response));
}

private awaitResponseToMessage(
message: ClientToServerBody
message: ClientToServerBody,
requestId = nextRequestId()
): Promise<unknown> {
return new Promise((resolve, reject) => {
const requestId = nextRequestId();
this.sendMessageToServer(message, requestId);
this.pendingRequests.set(requestId, { reject, resolve });
});
Expand Down Expand Up @@ -642,7 +734,7 @@ export class ServerProxy {
);
break;

case Message.LOGIN_SUCCESS:
case "LOGIN_SUCCESS":
if (sessionId) {
this.sessionId = sessionId;
// we should tear down the pending Login now
Expand All @@ -654,68 +746,6 @@ export class ServerProxy {
break;
// TODO login rejected

case Message.CREATE_VP_SUCCESS:
{
const viewport = viewports.get(requestId);
// The clientViewportId was used as requestId for CREATE_VP message. From this point,
// we will key viewports using serverViewPortId and maintain a mapping between client
// and server viewport ids.
if (viewport) {
const { status: viewportStatus } = viewport;
const { viewPortId: serverViewportId } = body;

if (requestId !== serverViewportId) {
viewports.delete(requestId);
viewports.set(serverViewportId, viewport);
}
this.mapClientToServerViewport.set(requestId, serverViewportId);
const response = viewport.handleSubscribed(body);
if (response) {
this.postMessageToClient(response);
if (debugEnabled) {
debug(
`post DataSourceSubscribedMessage to client: ${JSON.stringify(
response
)}`
);
}
}
// In the case of a reconnect, we may have resubscribed a disabled viewport,
// reset the disabled state on server
if (viewport.disabled) {
this.disableViewport(viewport);
}
if (
viewportStatus === "subscribing" &&
// A session table will never have Visual Links, nor Context Menus
!isSessionTable(viewport.table)
) {
// If status is "resubscribing", the following is unnecessary
this.sendMessageToServer({
type: Message.GET_VP_VISUAL_LINKS,
vpId: serverViewportId,
});
this.sendMessageToServer({
type: Message.GET_VIEW_PORT_MENUS,
vpId: serverViewportId,
});

// Resend requests for links from other viewports already on page, they may be linkable to this viewport
Array.from(viewports.entries())
.filter(
([id, { disabled }]) => id !== serverViewportId && !disabled
)
.forEach(([vpId]) => {
this.sendMessageToServer({
type: Message.GET_VP_VISUAL_LINKS,
vpId,
});
});
}
}
}
break;

case "REMOVE_VP_SUCCESS":
{
const viewport = viewports.get(body.viewPortId);
Expand Down Expand Up @@ -891,43 +921,6 @@ export class ServerProxy {
}
break;

case Message.TABLE_LIST_RESP:
this.postMessageToClient({
type: Message.TABLE_LIST_RESP,
tables: body.tables,
requestId,
} as VuuUIMessageInTableList);
break;

case Message.TABLE_META_RESP:
// This request may have originated from client or may have been made by
// ServerProxy whilst creating a new subscription
{
const tableSchema = this.cacheTableMeta(body);
const clientViewportId = this.pendingTableMetaRequests.get(requestId);
if (clientViewportId) {
this.pendingTableMetaRequests.delete(requestId);
// If the viewport is still stored under clientViewportId, the subscription has not
// yet been acknowledged and client not informed. If the subscription has already
// been acknowledged, the viewport will be stored under serverViewportId;
const viewport = this.viewports.get(clientViewportId);
if (viewport) {
viewport.setTableSchema(tableSchema);
} else {
warn?.(
"Message has come back AFTER CREATE_VP_SUCCESS, what do we do now"
);
}
} else {
this.postMessageToClient({
type: Message.TABLE_META_RESP,
tableSchema,
requestId,
} as VuuUIMessageInTableMeta);
}
}
break;

case "VP_VISUAL_LINKS_RESP":
{
const activeLinkDescriptors = this.getActiveLinks(body.links);
Expand Down Expand Up @@ -1073,10 +1066,6 @@ export class ServerProxy {
}
}

private hasSchemaForTable(table: VuuTable) {
return this.cachedTableSchemas.has(`${table.module}:${table.table}`);
}

private cacheTableMeta(messageBody: ServerToClientTableMeta): TableSchema {
const { module, table } = messageBody.table;
const key = `${module}:${table}`;
Expand Down
Loading

0 comments on commit 8e9ffff

Please sign in to comment.