From bb5a7ce17eefc7ea923dc39b28007ec230012c10 Mon Sep 17 00:00:00 2001 From: Eric Trautman Date: Tue, 26 Mar 2024 10:27:09 -0400 Subject: [PATCH] add numberOfStacksPerBatch parameter to UnconnectedCrossMFOVParameters to allow match requests to be throttled --- .../UnconnectedCrossMFOVParameters.java | 27 ++++++++++++ .../multisem/UnconnectedCrossMFOVClient.java | 41 ++++++++++++------- 2 files changed, 54 insertions(+), 14 deletions(-) diff --git a/render-ws-java-client/src/main/java/org/janelia/render/client/parameter/UnconnectedCrossMFOVParameters.java b/render-ws-java-client/src/main/java/org/janelia/render/client/parameter/UnconnectedCrossMFOVParameters.java index 81ffd73f7..213c184ce 100644 --- a/render-ws-java-client/src/main/java/org/janelia/render/client/parameter/UnconnectedCrossMFOVParameters.java +++ b/render-ws-java-client/src/main/java/org/janelia/render/client/parameter/UnconnectedCrossMFOVParameters.java @@ -4,8 +4,11 @@ import java.io.Reader; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; import org.janelia.alignment.json.JsonUtils; +import org.janelia.alignment.spec.stack.StackWithZValues; /** * Parameters for finding and storing unconnected MFOVs. @@ -33,6 +36,11 @@ public class UnconnectedCrossMFOVParameters ) public String unconnectedMFOVPairsDirectory; + @Parameter( + names = "--numberOfStacksPerBatch", + description = "Number of stacks to process in each batch when using spark") + public int numberOfStacksPerBatch = 1; + public UnconnectedCrossMFOVParameters() { } @@ -47,6 +55,25 @@ public static UnconnectedCrossMFOVParameters fromJson(final Reader json) { return JSON_HELPER.fromJson(json); } + /** + * @return the specified StackWithZValues list bundled into groups of numberOfStacksPerBatch. + */ + public List> bundleStacks(final List stackList) { + final List> bundles = new ArrayList<>(); + List currentBundle = new ArrayList<>(); + for (final StackWithZValues stack : stackList) { + currentBundle.add(stack); + if (currentBundle.size() == numberOfStacksPerBatch) { + bundles.add(currentBundle); + currentBundle = new ArrayList<>(); + } + } + if (! currentBundle.isEmpty()) { + bundles.add(currentBundle); + } + return bundles; + } + private static final JsonUtils.Helper JSON_HELPER = new JsonUtils.Helper<>(UnconnectedCrossMFOVParameters.class); } diff --git a/render-ws-spark-client/src/main/java/org/janelia/render/client/spark/multisem/UnconnectedCrossMFOVClient.java b/render-ws-spark-client/src/main/java/org/janelia/render/client/spark/multisem/UnconnectedCrossMFOVClient.java index b7e619936..b5b2cb376 100644 --- a/render-ws-spark-client/src/main/java/org/janelia/render/client/spark/multisem/UnconnectedCrossMFOVClient.java +++ b/render-ws-spark-client/src/main/java/org/janelia/render/client/spark/multisem/UnconnectedCrossMFOVClient.java @@ -6,6 +6,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -128,28 +129,40 @@ private List findUnconnectedMFOVs(final JavaSparkC final MultiProjectParameters multiProjectParameters = clientParameters.multiProject; final String baseDataUrl = multiProjectParameters.getBaseDataUrl(); final List stackWithZValuesList = multiProjectParameters.buildListOfStackWithAllZ(); + final List> bundledStackList = clientParameters.core.bundleStacks(stackWithZValuesList); - LOG.info("findUnconnectedMFOVs: distributing tasks for {} stacks", stackWithZValuesList.size()); + LOG.info("findUnconnectedMFOVs: distributing tasks for {} bundles of {} stacks", + bundledStackList.size(), stackWithZValuesList.size()); - final JavaRDD rddStackWithZValues = sparkContext.parallelize(stackWithZValuesList); + final JavaRDD> rddStackWithZValues = sparkContext.parallelize(bundledStackList); - final Function findFunction = stackWithZ -> { + final Function, List> findFunction = stackWithZList -> { - LogUtilities.setupExecutorLog4j(stackWithZ.toString()); + final List unconnectedMFOVsForEachStack = new ArrayList<>(); - final StackId stackId = stackWithZ.getStackId(); - final RenderDataClient localDataClient = new RenderDataClient(baseDataUrl, - stackId.getOwner(), - stackId.getProject()); - final org.janelia.render.client.multisem.UnconnectedCrossMFOVClient jClient = clientParameters.buildJavaClient(); + for (final StackWithZValues stackWithZ : stackWithZList) { + + LogUtilities.setupExecutorLog4j(stackWithZ.toString()); + + final StackId stackId = stackWithZ.getStackId(); + final RenderDataClient localDataClient = new RenderDataClient(baseDataUrl, + stackId.getOwner(), + stackId.getProject()); + final org.janelia.render.client.multisem.UnconnectedCrossMFOVClient jClient = + clientParameters.buildJavaClient(); + + unconnectedMFOVsForEachStack.add( + jClient.findUnconnectedMFOVs(stackWithZ, + multiProjectParameters.deriveMatchCollectionNamesFromProject, + localDataClient)); + } - return jClient.findUnconnectedMFOVs(stackWithZ, - multiProjectParameters.deriveMatchCollectionNamesFromProject, - localDataClient); + return unconnectedMFOVsForEachStack; }; - final JavaRDD rddUnconnected = rddStackWithZValues.map(findFunction); - final List possiblyEmptyUnconnectedList = rddUnconnected.collect(); + final JavaRDD> rddUnconnected = rddStackWithZValues.map(findFunction); + final List possiblyEmptyUnconnectedList = + rddUnconnected.collect().stream().flatMap(List::stream).collect(Collectors.toList()); LOG.info("findUnconnectedMFOVs: collected {} items from rddUnconnected", possiblyEmptyUnconnectedList.size());