From fff1e073a46c0f640ad9ac1b97af1a184b3485f6 Mon Sep 17 00:00:00 2001 From: Chris Jordan Date: Thu, 18 Apr 2024 21:02:39 -0400 Subject: [PATCH] perf(datasource/graphene): implemented leaves_many using a proxy class to group together individual segment id requests within the same bounding box --- src/datasource/graphene/backend.ts | 138 +++++++++++++++++++---------- 1 file changed, 93 insertions(+), 45 deletions(-) diff --git a/src/datasource/graphene/backend.ts b/src/datasource/graphene/backend.ts index 527972bec..1ab931829 100644 --- a/src/datasource/graphene/backend.ts +++ b/src/datasource/graphene/backend.ts @@ -16,33 +16,33 @@ import { debounce } from "lodash-es"; import { - WithParameters, - withChunkManager, Chunk, ChunkSource, + WithParameters, + withChunkManager, } from "#src/chunk_manager/backend.js"; import { ChunkPriorityTier, ChunkState } from "#src/chunk_manager/base.js"; +import type { CredentialsProvider } from "#src/credentials_provider/index.js"; import { WithSharedCredentialsProviderCounterpart } from "#src/credentials_provider/shared_counterpart.js"; import type { ChunkedGraphChunkSpecification } from "#src/datasource/graphene/base.js"; import { - getGrapheneFragmentKey, - GRAPHENE_MESH_NEW_SEGMENT_RPC_ID, - responseIdentity, - ChunkedGraphSourceParameters, - MeshSourceParameters, CHUNKED_GRAPH_LAYER_RPC_ID, CHUNKED_GRAPH_RENDER_LAYER_UPDATE_SOURCES_RPC_ID, + ChunkedGraphSourceParameters, + GRAPHENE_MESH_NEW_SEGMENT_RPC_ID, + MeshSourceParameters, RENDER_RATIO_LIMIT, + getGrapheneFragmentKey, isBaseSegmentId, } from "#src/datasource/graphene/base.js"; import { decodeManifestChunk } from "#src/datasource/precomputed/backend.js"; import type { FragmentChunk, ManifestChunk } from "#src/mesh/backend.js"; -import { assignMeshFragmentData, MeshSource } from "#src/mesh/backend.js"; +import { MeshSource, assignMeshFragmentData } from "#src/mesh/backend.js"; import { decodeDraco } from "#src/mesh/draco/index.js"; import type { DisplayDimensionRenderInfo } from "#src/navigation_state.js"; import type { - RenderedViewBackend, RenderLayerBackendAttachment, + RenderedViewBackend, } from "#src/render_layer_backend.js"; import { RenderLayerBackend } from "#src/render_layer_backend.js"; import { withSegmentationLayerBackendState } from "#src/segmentation_display_state/backend.js"; @@ -51,8 +51,8 @@ import type { SharedWatchableValue } from "#src/shared_watchable_value.js"; import type { SliceViewChunkSourceBackend } from "#src/sliceview/backend.js"; import { deserializeTransformedSources } from "#src/sliceview/backend.js"; import type { - TransformedSource, SliceViewProjectionParameters, + TransformedSource, } from "#src/sliceview/base.js"; import { forEachPlaneIntersectingVolumetricChunk, @@ -61,9 +61,13 @@ import { import { computeChunkBounds } from "#src/sliceview/volume/backend.js"; import { Uint64Set } from "#src/uint64_set.js"; import { fetchSpecialHttpByteRange } from "#src/util/byte_range_http_requests.js"; -import type { CancellationToken } from "#src/util/cancellation.js"; +import { + CancellationTokenSource, + type CancellationToken, +} from "#src/util/cancellation.js"; import { vec3, vec3Key } from "#src/util/geom.js"; import { responseArrayBuffer, responseJson } from "#src/util/http_request.js"; +import { Signal } from "#src/util/signal.js"; import type { SpecialProtocolCredentials, SpecialProtocolCredentialsProvider, @@ -76,7 +80,7 @@ import { withSharedVisibility, } from "#src/visibility_priority/backend.js"; import type { RPC } from "#src/worker_rpc.js"; -import { registerSharedObject, registerRPC } from "#src/worker_rpc.js"; +import { registerRPC, registerSharedObject } from "#src/worker_rpc.js"; function getVerifiedFragmentPromise( credentialsProvider: SpecialProtocolCredentialsProvider, @@ -260,6 +264,74 @@ function decodeChunkedGraphChunk(leaves: string[]) { return final; } +class LeavesManyProxy { + pendingRequests = new Map< + string, + [Signal<(response: any) => void>, Uint64Set, CancellationTokenSource] + >(); + + constructor( + private parameters: ChunkedGraphSourceParameters, + private credentialsProvider?: CredentialsProvider, + ) {} + + async request( + segment: Uint64, + bounds: string, + cancellationToken: CancellationToken, + ): Promise { + const { pendingRequests } = this; + let pendingRequest = pendingRequests.get(bounds); + if (!pendingRequest) { + const { parameters, credentialsProvider } = this; + const signal = new Signal<(request: any) => void>(); + (signal as any).start = performance.now(); + const requestCancellationToken = new CancellationTokenSource(); + const segments = new Uint64Set(); + pendingRequest = [signal, segments, requestCancellationToken]; + pendingRequests.set(bounds, pendingRequest); + setTimeout(async () => { + pendingRequests.delete(bounds); + try { + const response = await cancellableFetchSpecialOk( + credentialsProvider, + `${parameters.url}/leaves_many?int64_as_str=1&bounds=${bounds}`, + { + method: "POST", + body: JSON.stringify({ + node_ids: [...segments], + }), + }, + responseJson, + requestCancellationToken, + ); + signal.dispatch(response); + } catch (e) { + signal.dispatch(e); + } + }, 0); + } + const [request, segments, requestCancellationToken] = pendingRequest; + segments.add(segment); + cancellationToken.add(() => { + segments.delete(segment); + if (segments.size === 0) { + requestCancellationToken.cancel(); + } + }); + return new Promise((f, r) => { + const unregister = request.add((response) => { + unregister(); + if (response instanceof Error) { + r(response); + } else { + f(response[segment.toJSON()]); + } + }); + }); + } +} + @registerSharedObject() export class GrapheneChunkedGraphChunkSource extends WithParameters( WithSharedCredentialsProviderCounterpart()( @@ -271,6 +343,7 @@ export class GrapheneChunkedGraphChunkSource extends WithParameters( chunks: Map; tempChunkDataSize: Uint32Array; tempChunkPosition: Float32Array; + leavesManyProxy: LeavesManyProxy; constructor(rpc: RPC, options: any) { super(rpc, options); @@ -278,13 +351,16 @@ export class GrapheneChunkedGraphChunkSource extends WithParameters( const rank = this.spec.rank; this.tempChunkDataSize = new Uint32Array(rank); this.tempChunkPosition = new Float32Array(rank); + this.leavesManyProxy = new LeavesManyProxy( + this.parameters, + this.credentialsProvider, + ); } async download( chunk: ChunkedGraphChunk, cancellationToken: CancellationToken, ): Promise { - const { parameters } = this; const chunkPosition = this.computeChunkBounds(chunk); const chunkDataSize = chunk.chunkDataSize!; const bounds = @@ -292,22 +368,12 @@ export class GrapheneChunkedGraphChunkSource extends WithParameters( `${chunkPosition[1]}-${chunkPosition[1] + chunkDataSize[1]}_` + `${chunkPosition[2]}-${chunkPosition[2] + chunkDataSize[2]}`; - const request = cancellableFetchSpecialOk( - this.credentialsProvider, - `${parameters.url}/${chunk.segment}/leaves?int64_as_str=1&bounds=${bounds}`, - {}, - responseIdentity, + const request = await this.leavesManyProxy.request( + chunk.segment, + bounds, cancellationToken, ); - await this.withErrorMessage( - request, - `Fetching leaves of segment ${chunk.segment} in region ${bounds}: `, - ) - .then((res) => res.json()) - .then((res) => { - chunk.leaves = decodeChunkedGraphChunk(res.leaf_ids); - }) - .catch((err) => console.error(err)); + chunk.leaves = decodeChunkedGraphChunk(request); } getChunk(chunkGridPosition: Float32Array, segment: Uint64) { @@ -325,23 +391,6 @@ export class GrapheneChunkedGraphChunkSource extends WithParameters( computeChunkBounds(chunk: ChunkedGraphChunk) { return computeChunkBounds(this, chunk); } - - async withErrorMessage( - promise: Promise, - errorPrefix: string, - ): Promise { - const response = await promise; - if (response.ok) { - return response; - } - let msg: string; - try { - msg = (await response.json()).message; - } catch { - msg = await response.text(); - } - throw new Error(`[${response.status}] ${errorPrefix}${msg}`); - } } interface ChunkedGraphRenderLayerAttachmentState { @@ -466,7 +515,6 @@ export class ChunkedGraphLayer extends withSegmentationLayerBackendState( forEachVisibleSegment(this, (segment, _) => { if (isBaseSegmentId(segment, this.nBitsForLayerId.value)) return; // TODO maybe support highBitRepresentation? - // TODO const chunk = source.getChunk(curPositionInChunks, segment.clone()); chunkManager.requestChunk( chunk,