Skip to content

Commit

Permalink
add numberOfStacksPerBatch parameter to UnconnectedCrossMFOVParameter…
Browse files Browse the repository at this point in the history
…s to allow match requests to be throttled
  • Loading branch information
trautmane committed Mar 26, 2024
1 parent b6b5ae8 commit bb5a7ce
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

import java.io.Reader;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.janelia.alignment.json.JsonUtils;
import org.janelia.alignment.spec.stack.StackWithZValues;

/**
* Parameters for finding and storing unconnected MFOVs.
Expand Down Expand Up @@ -33,6 +36,11 @@ public class UnconnectedCrossMFOVParameters
)
public String unconnectedMFOVPairsDirectory;

@Parameter(
names = "--numberOfStacksPerBatch",
description = "Number of stacks to process in each batch when using spark")
public int numberOfStacksPerBatch = 1;

public UnconnectedCrossMFOVParameters() {
}

Expand All @@ -47,6 +55,25 @@ public static UnconnectedCrossMFOVParameters fromJson(final Reader json) {
return JSON_HELPER.fromJson(json);
}

/**
* @return the specified StackWithZValues list bundled into groups of numberOfStacksPerBatch.
*/
public List<List<StackWithZValues>> bundleStacks(final List<StackWithZValues> stackList) {
final List<List<StackWithZValues>> bundles = new ArrayList<>();
List<StackWithZValues> currentBundle = new ArrayList<>();
for (final StackWithZValues stack : stackList) {
currentBundle.add(stack);
if (currentBundle.size() == numberOfStacksPerBatch) {
bundles.add(currentBundle);
currentBundle = new ArrayList<>();
}
}
if (! currentBundle.isEmpty()) {
bundles.add(currentBundle);
}
return bundles;
}

private static final JsonUtils.Helper<UnconnectedCrossMFOVParameters> JSON_HELPER =
new JsonUtils.Helper<>(UnconnectedCrossMFOVParameters.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
Expand Down Expand Up @@ -128,28 +129,40 @@ private List<UnconnectedMFOVPairsForStack> findUnconnectedMFOVs(final JavaSparkC
final MultiProjectParameters multiProjectParameters = clientParameters.multiProject;
final String baseDataUrl = multiProjectParameters.getBaseDataUrl();
final List<StackWithZValues> stackWithZValuesList = multiProjectParameters.buildListOfStackWithAllZ();
final List<List<StackWithZValues>> bundledStackList = clientParameters.core.bundleStacks(stackWithZValuesList);

LOG.info("findUnconnectedMFOVs: distributing tasks for {} stacks", stackWithZValuesList.size());
LOG.info("findUnconnectedMFOVs: distributing tasks for {} bundles of {} stacks",
bundledStackList.size(), stackWithZValuesList.size());

final JavaRDD<StackWithZValues> rddStackWithZValues = sparkContext.parallelize(stackWithZValuesList);
final JavaRDD<List<StackWithZValues>> rddStackWithZValues = sparkContext.parallelize(bundledStackList);

final Function<StackWithZValues, UnconnectedMFOVPairsForStack> findFunction = stackWithZ -> {
final Function<List<StackWithZValues>, List<UnconnectedMFOVPairsForStack>> findFunction = stackWithZList -> {

LogUtilities.setupExecutorLog4j(stackWithZ.toString());
final List<UnconnectedMFOVPairsForStack> unconnectedMFOVsForEachStack = new ArrayList<>();

final StackId stackId = stackWithZ.getStackId();
final RenderDataClient localDataClient = new RenderDataClient(baseDataUrl,
stackId.getOwner(),
stackId.getProject());
final org.janelia.render.client.multisem.UnconnectedCrossMFOVClient jClient = clientParameters.buildJavaClient();
for (final StackWithZValues stackWithZ : stackWithZList) {

LogUtilities.setupExecutorLog4j(stackWithZ.toString());

final StackId stackId = stackWithZ.getStackId();
final RenderDataClient localDataClient = new RenderDataClient(baseDataUrl,
stackId.getOwner(),
stackId.getProject());
final org.janelia.render.client.multisem.UnconnectedCrossMFOVClient jClient =
clientParameters.buildJavaClient();

unconnectedMFOVsForEachStack.add(
jClient.findUnconnectedMFOVs(stackWithZ,
multiProjectParameters.deriveMatchCollectionNamesFromProject,
localDataClient));
}

return jClient.findUnconnectedMFOVs(stackWithZ,
multiProjectParameters.deriveMatchCollectionNamesFromProject,
localDataClient);
return unconnectedMFOVsForEachStack;
};

final JavaRDD<UnconnectedMFOVPairsForStack> rddUnconnected = rddStackWithZValues.map(findFunction);
final List<UnconnectedMFOVPairsForStack> possiblyEmptyUnconnectedList = rddUnconnected.collect();
final JavaRDD<List<UnconnectedMFOVPairsForStack>> rddUnconnected = rddStackWithZValues.map(findFunction);
final List<UnconnectedMFOVPairsForStack> possiblyEmptyUnconnectedList =
rddUnconnected.collect().stream().flatMap(List::stream).collect(Collectors.toList());

LOG.info("findUnconnectedMFOVs: collected {} items from rddUnconnected", possiblyEmptyUnconnectedList.size());

Expand Down

0 comments on commit bb5a7ce

Please sign in to comment.