Skip to content

Commit

Permalink
implemented leaves_many using a proxy object
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisj committed Apr 19, 2024
1 parent 366afcc commit 110cfa3
Showing 1 changed file with 78 additions and 29 deletions.
107 changes: 78 additions & 29 deletions src/datasource/graphene/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,34 @@

import { debounce } from "lodash-es";
import {
WithParameters,
withChunkManager,
Chunk,
ChunkSource,
WithParameters,
withChunkManager,
} from "#src/chunk_manager/backend.js";
import { ChunkPriorityTier, ChunkState } from "#src/chunk_manager/base.js";
import type { CredentialsProvider } from "#src/credentials_provider/index.js";
import { WithSharedCredentialsProviderCounterpart } from "#src/credentials_provider/shared_counterpart.js";
import type { ChunkedGraphChunkSpecification } from "#src/datasource/graphene/base.js";
import {
getGrapheneFragmentKey,
GRAPHENE_MESH_NEW_SEGMENT_RPC_ID,
responseIdentity,
ChunkedGraphSourceParameters,
MeshSourceParameters,
CHUNKED_GRAPH_LAYER_RPC_ID,
CHUNKED_GRAPH_RENDER_LAYER_UPDATE_SOURCES_RPC_ID,
ChunkedGraphSourceParameters,
GRAPHENE_MESH_NEW_SEGMENT_RPC_ID,
MeshSourceParameters,
RENDER_RATIO_LIMIT,
getGrapheneFragmentKey,
isBaseSegmentId,
responseIdentity,
} from "#src/datasource/graphene/base.js";
import { decodeManifestChunk } from "#src/datasource/precomputed/backend.js";
import type { FragmentChunk, ManifestChunk } from "#src/mesh/backend.js";
import { assignMeshFragmentData, MeshSource } from "#src/mesh/backend.js";
import { MeshSource, assignMeshFragmentData } from "#src/mesh/backend.js";
import { decodeDraco } from "#src/mesh/draco/index.js";
import type { DisplayDimensionRenderInfo } from "#src/navigation_state.js";
import type {
RenderedViewBackend,
RenderLayerBackendAttachment,
RenderedViewBackend,
} from "#src/render_layer_backend.js";
import { RenderLayerBackend } from "#src/render_layer_backend.js";
import { withSegmentationLayerBackendState } from "#src/segmentation_display_state/backend.js";
Expand All @@ -51,8 +52,8 @@ import type { SharedWatchableValue } from "#src/shared_watchable_value.js";
import type { SliceViewChunkSourceBackend } from "#src/sliceview/backend.js";
import { deserializeTransformedSources } from "#src/sliceview/backend.js";
import type {
TransformedSource,
SliceViewProjectionParameters,
TransformedSource,
} from "#src/sliceview/base.js";
import {
forEachPlaneIntersectingVolumetricChunk,
Expand All @@ -64,6 +65,7 @@ import { fetchSpecialHttpByteRange } from "#src/util/byte_range_http_requests.js
import type { CancellationToken } from "#src/util/cancellation.js";
import { vec3, vec3Key } from "#src/util/geom.js";
import { responseArrayBuffer, responseJson } from "#src/util/http_request.js";
import { Signal } from "#src/util/signal.js";
import type {
SpecialProtocolCredentials,
SpecialProtocolCredentialsProvider,
Expand All @@ -76,7 +78,7 @@ import {
withSharedVisibility,
} from "#src/visibility_priority/backend.js";
import type { RPC } from "#src/worker_rpc.js";
import { registerSharedObject, registerRPC } from "#src/worker_rpc.js";
import { registerRPC, registerSharedObject } from "#src/worker_rpc.js";

function getVerifiedFragmentPromise(
credentialsProvider: SpecialProtocolCredentialsProvider,
Expand Down Expand Up @@ -260,6 +262,60 @@ function decodeChunkedGraphChunk(leaves: string[]) {
return final;
}

class BulkRequestProxy {
pendingRequests = new Map<
string,
[Signal<(request: any) => void>, Uint64Set]
>();

constructor(
private parameters: ChunkedGraphSourceParameters,
private credentialsProvider?: CredentialsProvider<any>,
) {}

async request(segment: Uint64, bounds: string): Promise<any> {
responseIdentity;
const { pendingRequests } = this;
let pendingRequest = pendingRequests.get(bounds);
if (!pendingRequest) {
const { parameters, credentialsProvider } = this;
const signal = new Signal<(request: any) => void>();
pendingRequest = [signal, new Uint64Set()];
pendingRequests.set(bounds, pendingRequest);
setTimeout(async () => {
const pendingRequest = pendingRequests.get(bounds);
pendingRequests.delete(bounds);
if (!pendingRequest) {
console.error("how could this happen?");
return;
}
const [_, segments] = pendingRequest;
_;
const response = await cancellableFetchSpecialOk(
credentialsProvider,
`${parameters.url}/leaves_many?int64_as_str=1&bounds=${bounds}`,
{
method: "POST",
body: JSON.stringify({
node_ids: [...segments],
}),
},
responseJson,
);
signal.dispatch(response);
}, 0);
}
const [request, segments] = pendingRequest;
segments.add(segment);
return new Promise((f) => {
const unregister = request.add((response) => {
f(response[segment.toJSON()]);
unregister();
});
});
}
}

@registerSharedObject()
export class GrapheneChunkedGraphChunkSource extends WithParameters(
WithSharedCredentialsProviderCounterpart<SpecialProtocolCredentials>()(
Expand All @@ -271,43 +327,37 @@ export class GrapheneChunkedGraphChunkSource extends WithParameters(
chunks: Map<string, ChunkedGraphChunk>;
tempChunkDataSize: Uint32Array;
tempChunkPosition: Float32Array;
requestProxy: BulkRequestProxy;

constructor(rpc: RPC, options: any) {
super(rpc, options);
this.spec = options.spec;
const rank = this.spec.rank;
this.tempChunkDataSize = new Uint32Array(rank);
this.tempChunkPosition = new Float32Array(rank);
this.requestProxy = new BulkRequestProxy(
this.parameters,
this.credentialsProvider,
);
}

async download(
chunk: ChunkedGraphChunk,
cancellationToken: CancellationToken,
): Promise<void> {
const { parameters } = this;
const chunkPosition = this.computeChunkBounds(chunk);
const chunkDataSize = chunk.chunkDataSize!;
const bounds =
`${chunkPosition[0]}-${chunkPosition[0] + chunkDataSize[0]}_` +
`${chunkPosition[1]}-${chunkPosition[1] + chunkDataSize[1]}_` +
`${chunkPosition[2]}-${chunkPosition[2] + chunkDataSize[2]}`;

const request = cancellableFetchSpecialOk(
this.credentialsProvider,
`${parameters.url}/${chunk.segment}/leaves?int64_as_str=1&bounds=${bounds}`,
{},
responseIdentity,
cancellationToken,
);
await this.withErrorMessage(
request,
`Fetching leaves of segment ${chunk.segment} in region ${bounds}: `,
)
.then((res) => res.json())
.then((res) => {
chunk.leaves = decodeChunkedGraphChunk(res.leaf_ids);
})
.catch((err) => console.error(err));
// not using cancellationToken due to complexity with bulk request (would only want to cancel if member agreed)
cancellationToken;

const request = await this.requestProxy.request(chunk.segment, bounds);
chunk.leaves = decodeChunkedGraphChunk(request);
// .catch((err) => console.error(err));
}

getChunk(chunkGridPosition: Float32Array, segment: Uint64) {
Expand Down Expand Up @@ -466,7 +516,6 @@ export class ChunkedGraphLayer extends withSegmentationLayerBackendState(

forEachVisibleSegment(this, (segment, _) => {
if (isBaseSegmentId(segment, this.nBitsForLayerId.value)) return; // TODO maybe support highBitRepresentation?
// TODO
const chunk = source.getChunk(curPositionInChunks, segment.clone());
chunkManager.requestChunk(
chunk,
Expand Down

0 comments on commit 110cfa3

Please sign in to comment.