diff --git a/src/datasource/graphene/backend.ts b/src/datasource/graphene/backend.ts index 527972bec6..61da5d7b2f 100644 --- a/src/datasource/graphene/backend.ts +++ b/src/datasource/graphene/backend.ts @@ -16,33 +16,34 @@ import { debounce } from "lodash-es"; import { - WithParameters, - withChunkManager, Chunk, ChunkSource, + WithParameters, + withChunkManager, } from "#src/chunk_manager/backend.js"; import { ChunkPriorityTier, ChunkState } from "#src/chunk_manager/base.js"; +import type { CredentialsProvider } from "#src/credentials_provider/index.js"; import { WithSharedCredentialsProviderCounterpart } from "#src/credentials_provider/shared_counterpart.js"; import type { ChunkedGraphChunkSpecification } from "#src/datasource/graphene/base.js"; import { - getGrapheneFragmentKey, - GRAPHENE_MESH_NEW_SEGMENT_RPC_ID, - responseIdentity, - ChunkedGraphSourceParameters, - MeshSourceParameters, CHUNKED_GRAPH_LAYER_RPC_ID, CHUNKED_GRAPH_RENDER_LAYER_UPDATE_SOURCES_RPC_ID, + ChunkedGraphSourceParameters, + GRAPHENE_MESH_NEW_SEGMENT_RPC_ID, + MeshSourceParameters, RENDER_RATIO_LIMIT, + getGrapheneFragmentKey, isBaseSegmentId, + responseIdentity, } from "#src/datasource/graphene/base.js"; import { decodeManifestChunk } from "#src/datasource/precomputed/backend.js"; import type { FragmentChunk, ManifestChunk } from "#src/mesh/backend.js"; -import { assignMeshFragmentData, MeshSource } from "#src/mesh/backend.js"; +import { MeshSource, assignMeshFragmentData } from "#src/mesh/backend.js"; import { decodeDraco } from "#src/mesh/draco/index.js"; import type { DisplayDimensionRenderInfo } from "#src/navigation_state.js"; import type { - RenderedViewBackend, RenderLayerBackendAttachment, + RenderedViewBackend, } from "#src/render_layer_backend.js"; import { RenderLayerBackend } from "#src/render_layer_backend.js"; import { withSegmentationLayerBackendState } from "#src/segmentation_display_state/backend.js"; @@ -51,8 +52,8 @@ import type { SharedWatchableValue } from "#src/shared_watchable_value.js"; import type { SliceViewChunkSourceBackend } from "#src/sliceview/backend.js"; import { deserializeTransformedSources } from "#src/sliceview/backend.js"; import type { - TransformedSource, SliceViewProjectionParameters, + TransformedSource, } from "#src/sliceview/base.js"; import { forEachPlaneIntersectingVolumetricChunk, @@ -64,6 +65,7 @@ import { fetchSpecialHttpByteRange } from "#src/util/byte_range_http_requests.js import type { CancellationToken } from "#src/util/cancellation.js"; import { vec3, vec3Key } from "#src/util/geom.js"; import { responseArrayBuffer, responseJson } from "#src/util/http_request.js"; +import { Signal } from "#src/util/signal.js"; import type { SpecialProtocolCredentials, SpecialProtocolCredentialsProvider, @@ -76,7 +78,7 @@ import { withSharedVisibility, } from "#src/visibility_priority/backend.js"; import type { RPC } from "#src/worker_rpc.js"; -import { registerSharedObject, registerRPC } from "#src/worker_rpc.js"; +import { registerRPC, registerSharedObject } from "#src/worker_rpc.js"; function getVerifiedFragmentPromise( credentialsProvider: SpecialProtocolCredentialsProvider, @@ -260,6 +262,60 @@ function decodeChunkedGraphChunk(leaves: string[]) { return final; } +class BulkRequestProxy { + pendingRequests = new Map< + string, + [Signal<(request: any) => void>, Uint64Set] + >(); + + constructor( + private parameters: ChunkedGraphSourceParameters, + private credentialsProvider?: CredentialsProvider, + ) {} + + async request(segment: Uint64, bounds: string): Promise { + responseIdentity; + const { pendingRequests } = this; + let pendingRequest = pendingRequests.get(bounds); + if (!pendingRequest) { + const { parameters, credentialsProvider } = this; + const signal = new Signal<(request: any) => void>(); + pendingRequest = [signal, new Uint64Set()]; + pendingRequests.set(bounds, pendingRequest); + setTimeout(async () => { + const pendingRequest = pendingRequests.get(bounds); + pendingRequests.delete(bounds); + if (!pendingRequest) { + console.error("how could this happen?"); + return; + } + const [_, segments] = pendingRequest; + _; + const response = await cancellableFetchSpecialOk( + credentialsProvider, + `${parameters.url}/leaves_many?int64_as_str=1&bounds=${bounds}`, + { + method: "POST", + body: JSON.stringify({ + node_ids: [...segments], + }), + }, + responseJson, + ); + signal.dispatch(response); + }, 0); + } + const [request, segments] = pendingRequest; + segments.add(segment); + return new Promise((f) => { + const unregister = request.add((response) => { + f(response[segment.toJSON()]); + unregister(); + }); + }); + } +} + @registerSharedObject() export class GrapheneChunkedGraphChunkSource extends WithParameters( WithSharedCredentialsProviderCounterpart()( @@ -271,6 +327,7 @@ export class GrapheneChunkedGraphChunkSource extends WithParameters( chunks: Map; tempChunkDataSize: Uint32Array; tempChunkPosition: Float32Array; + requestProxy: BulkRequestProxy; constructor(rpc: RPC, options: any) { super(rpc, options); @@ -278,13 +335,16 @@ export class GrapheneChunkedGraphChunkSource extends WithParameters( const rank = this.spec.rank; this.tempChunkDataSize = new Uint32Array(rank); this.tempChunkPosition = new Float32Array(rank); + this.requestProxy = new BulkRequestProxy( + this.parameters, + this.credentialsProvider, + ); } async download( chunk: ChunkedGraphChunk, cancellationToken: CancellationToken, ): Promise { - const { parameters } = this; const chunkPosition = this.computeChunkBounds(chunk); const chunkDataSize = chunk.chunkDataSize!; const bounds = @@ -292,22 +352,12 @@ export class GrapheneChunkedGraphChunkSource extends WithParameters( `${chunkPosition[1]}-${chunkPosition[1] + chunkDataSize[1]}_` + `${chunkPosition[2]}-${chunkPosition[2] + chunkDataSize[2]}`; - const request = cancellableFetchSpecialOk( - this.credentialsProvider, - `${parameters.url}/${chunk.segment}/leaves?int64_as_str=1&bounds=${bounds}`, - {}, - responseIdentity, - cancellationToken, - ); - await this.withErrorMessage( - request, - `Fetching leaves of segment ${chunk.segment} in region ${bounds}: `, - ) - .then((res) => res.json()) - .then((res) => { - chunk.leaves = decodeChunkedGraphChunk(res.leaf_ids); - }) - .catch((err) => console.error(err)); + // not using cancellationToken due to complexity with bulk request (would only want to cancel if member agreed) + cancellationToken; + + const request = await this.requestProxy.request(chunk.segment, bounds); + chunk.leaves = decodeChunkedGraphChunk(request); + // .catch((err) => console.error(err)); } getChunk(chunkGridPosition: Float32Array, segment: Uint64) { @@ -466,7 +516,6 @@ export class ChunkedGraphLayer extends withSegmentationLayerBackendState( forEachVisibleSegment(this, (segment, _) => { if (isBaseSegmentId(segment, this.nBitsForLayerId.value)) return; // TODO maybe support highBitRepresentation? - // TODO const chunk = source.getChunk(curPositionInChunks, segment.clone()); chunkManager.requestChunk( chunk,