Skip to content

Commit

Permalink
add mask hack client for multi-SEM
Browse files Browse the repository at this point in the history
  • Loading branch information
trautmane committed Apr 20, 2024
1 parent d03f48b commit 71df1bc
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.janelia.render.client.parameter;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.ParametersDelegate;

import java.io.Serializable;

import org.janelia.alignment.json.JsonUtils;
import org.janelia.alignment.loader.DynamicMaskLoader;
import org.janelia.alignment.spec.stack.StackId;
import org.janelia.render.client.newsolver.setup.TargetStackParameters;

// TODO: move/merge this into larger copy stack parameters class when the copy clients get updated

/**
* Parameters for adding or updating masks.
*
* @author Eric Trautman
*/
@Parameters
public class MaskHackParameters
implements Serializable {

@ParametersDelegate
public TargetStackParameters targetStack;

@Parameter(
names = "--dynamicMaskValue",
description = "Value (without mask:// prefix) for all masks, e.g. " +
"outside-box?minX=0&minY=10&maxX=2000&maxY=1748&width=2000&height=1748")
public String dynamicMaskValue;

public MaskHackParameters() {
}

public void validate()
throws IllegalArgumentException {

if (dynamicMaskValue == null) {
throw new IllegalArgumentException("must specify --dynamicMaskValue");
} else {
DynamicMaskLoader.parseUrl(getDynamicMaskUrl()); // throws exception if mask value is invalid
}
}

public String getDynamicMaskUrl() {
return dynamicMaskValue == null ? null : "mask://" + dynamicMaskValue;
}

public MaskHackParameters buildPipelineClone(final StackId sourceStackId) {
final MaskHackParameters clone = this.clone();
clone.targetStack.setValuesFromPipeline(sourceStackId, "_masked");
return clone;
}

/** (Slowly) creates a clone of this setup by serializing it to and from JSON. */
@SuppressWarnings("MethodDoesntCallSuperMethod")
public MaskHackParameters clone() {
final String json = JSON_HELPER.toJson(this);
return JSON_HELPER.fromJson(json);
}

private static final JsonUtils.Helper<MaskHackParameters> JSON_HELPER =
new JsonUtils.Helper<>(MaskHackParameters.class);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package org.janelia.render.client.spark.mask;

import com.beust.jcommander.ParametersDelegate;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.janelia.alignment.ImageAndMask;
import org.janelia.alignment.loader.ImageLoader;
import org.janelia.alignment.spec.ChannelSpec;
import org.janelia.alignment.spec.ResolvedTileSpecCollection;
import org.janelia.alignment.spec.stack.StackId;
import org.janelia.alignment.spec.stack.StackMetaData;
import org.janelia.alignment.spec.stack.StackWithZValues;
import org.janelia.render.client.ClientRunner;
import org.janelia.render.client.RenderDataClient;
import org.janelia.render.client.newsolver.setup.TargetStackParameters;
import org.janelia.render.client.parameter.CommandLineParameters;
import org.janelia.render.client.parameter.MaskHackParameters;
import org.janelia.render.client.parameter.MultiProjectParameters;
import org.janelia.render.client.spark.LogUtilities;
import org.janelia.render.client.spark.pipeline.AlignmentPipelineParameters;
import org.janelia.render.client.spark.pipeline.AlignmentPipelineStep;
import org.janelia.render.client.spark.pipeline.AlignmentPipelineStepId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// TODO: merge mask option with the spark and java copy stack clients when there is more time

/**
* Spark client for copying a stack and adding/updating a mask for all tiles in the copied stack.
*
* @author Eric Trautman
*/
public class MaskHackClient
implements Serializable, AlignmentPipelineStep {

public static class Parameters extends CommandLineParameters {
@ParametersDelegate
public MultiProjectParameters multiProject;

@ParametersDelegate
public MaskHackParameters maskHack;

public Parameters() {
this(new MultiProjectParameters(), new MaskHackParameters());
}

public Parameters(final MultiProjectParameters multiProject,
final MaskHackParameters maskHack) {
this.multiProject = multiProject;
this.maskHack = maskHack;
}
}

/** Run the client with command line parameters. */
public static void main(final String[] args) {
final ClientRunner clientRunner = new ClientRunner(args) {
@Override
public void runClient(final String[] args) throws Exception {
final Parameters parameters = new Parameters();
parameters.parse(args);
final MaskHackClient client = new MaskHackClient();
client.createContextAndRun(parameters);
}
};
clientRunner.run();
}

/** Empty constructor required for alignment pipeline steps. */
public MaskHackClient() {
}

/** Create a spark context and run the client with the specified parameters. */
public void createContextAndRun(final Parameters clientParameters) throws IOException {
final SparkConf conf = new SparkConf().setAppName(getClass().getSimpleName());
try (final JavaSparkContext sparkContext = new JavaSparkContext(conf)) {
LOG.info("createContextAndRun: appId is {}", sparkContext.getConf().getAppId());
copyStackAndSetMasks(sparkContext, clientParameters);
}
}

/** Validates the specified pipeline parameters are sufficient. */
@Override
public void validatePipelineParameters(final AlignmentPipelineParameters pipelineParameters)
throws IllegalArgumentException {
AlignmentPipelineParameters.validateRequiredElementExists("mask",
pipelineParameters.getMaskHack());
}

/** Run the client as part of an alignment pipeline. */
public void runPipelineStep(final JavaSparkContext sparkContext,
final AlignmentPipelineParameters pipelineParameters)
throws IllegalArgumentException, IOException {
// do not use naming groups because hacked masks need to be applied to different types of stacks
final Parameters clientParameters = new Parameters(pipelineParameters.getMultiProject(null),
pipelineParameters.getMaskHack());
copyStackAndSetMasks(sparkContext, clientParameters);
}

@Override
public AlignmentPipelineStepId getDefaultStepId() {
return AlignmentPipelineStepId.HACK_MASK;
}

private void copyStackAndSetMasks(final JavaSparkContext sparkContext,
final Parameters clientParameters)
throws IOException {

LOG.info("copyStackAndSetMasks: entry, clientParameters={}", clientParameters);

final MultiProjectParameters multiProjectParameters = clientParameters.multiProject;
final String baseDataUrl = multiProjectParameters.getBaseDataUrl();
final MaskHackParameters maskHack = clientParameters.maskHack;
final List<StackWithZValues> stackWithZValuesList = multiProjectParameters.buildListOfStackWithAllZ();

final JavaRDD<StackWithZValues> rddStackWithZValues = sparkContext.parallelize(stackWithZValuesList);

final Integer zeroLevelKey = 0;
final String dynamicMaskUrl = maskHack.getDynamicMaskUrl();

final Function<StackWithZValues, Void> copyAndSetMaskFunction = stackWithZ -> {

LogUtilities.setupExecutorLog4j(stackWithZ.toString());

final StackId stackId = stackWithZ.getStackId();
final RenderDataClient sourceDataClient = new RenderDataClient(baseDataUrl,
stackId.getOwner(),
stackId.getProject());
final TargetStackParameters target = maskHack.buildPipelineClone(stackId).targetStack;

final RenderDataClient targetDataClient = new RenderDataClient(baseDataUrl,
target.owner,
target.project);

final StackMetaData sourceMetaData = sourceDataClient.getStackMetaData(stackId.getStack());
targetDataClient.setupDerivedStack(sourceMetaData, target.stack);

for (final Double z : stackWithZ.getzValues()) {

final ResolvedTileSpecCollection resolvedTiles = sourceDataClient.getResolvedTiles(stackId.getStack(),
z);
resolvedTiles.getTileSpecs().forEach(tileSpec -> {

for (final ChannelSpec channelSpec : tileSpec.getAllChannels()) {

final Map.Entry<Integer, ImageAndMask> entry = channelSpec.getFirstMipmapEntry();

if ((entry != null) && zeroLevelKey.equals(entry.getKey())) {
final ImageAndMask sourceImageAndMask = entry.getValue();
final ImageAndMask updatedImageAndMask =
sourceImageAndMask.copyWithMask(dynamicMaskUrl,
ImageLoader.LoaderType.DYNAMIC_MASK,
null);
channelSpec.putMipmap(zeroLevelKey, updatedImageAndMask);
}
}

});

targetDataClient.saveResolvedTiles(resolvedTiles, target.stack, z);
}

if (target.completeStack) {
targetDataClient.setStackState(target.stack, StackMetaData.StackState.COMPLETE);
}

return null;
};

final JavaRDD<Void> rddCopyAndSetMask = rddStackWithZValues.map(copyAndSetMaskFunction);
rddCopyAndSetMask.collect();

LOG.info("copyStackAndSetMasks: collected rddCopyAndSetMask");
LOG.info("copyStackAndSetMasks: exit");
}

private static final Logger LOG = LoggerFactory.getLogger(MaskHackClient.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.janelia.render.client.newsolver.setup.AffineBlockSolverSetup;
import org.janelia.render.client.newsolver.setup.IntensityCorrectionSetup;
import org.janelia.render.client.parameter.MFOVMontageMatchPatchParameters;
import org.janelia.render.client.parameter.MaskHackParameters;
import org.janelia.render.client.parameter.MatchCopyParameters;
import org.janelia.render.client.parameter.MipmapParameters;
import org.janelia.render.client.parameter.MultiProjectParameters;
Expand Down Expand Up @@ -45,6 +46,7 @@ public class AlignmentPipelineParameters
private final AffineBlockSolverSetup affineBlockSolverSetup;
private final IntensityCorrectionSetup intensityCorrectionSetup;
private final ZSpacingParameters zSpacing;
private final MaskHackParameters maskHack;

@SuppressWarnings("unused")
public AlignmentPipelineParameters() {
Expand All @@ -59,6 +61,7 @@ public AlignmentPipelineParameters() {
null,
null,
null,
null,
null);
}

Expand All @@ -73,7 +76,8 @@ public AlignmentPipelineParameters(final MultiProjectParameters multiProject,
final MatchCopyParameters matchCopy,
final AffineBlockSolverSetup affineBlockSolverSetup,
final IntensityCorrectionSetup intensityCorrectionSetup,
final ZSpacingParameters zSpacing) {
final ZSpacingParameters zSpacing,
final MaskHackParameters maskHack) {
this.multiProject = multiProject;
this.pipelineStackGroups = pipelineStackGroups;
this.pipelineSteps = pipelineSteps;
Expand All @@ -86,6 +90,7 @@ public AlignmentPipelineParameters(final MultiProjectParameters multiProject,
this.affineBlockSolverSetup = affineBlockSolverSetup;
this.intensityCorrectionSetup = intensityCorrectionSetup;
this.zSpacing = zSpacing;
this.maskHack = maskHack;
}

public MultiProjectParameters getMultiProject(final StackIdNamingGroup withNamingGroup) {
Expand Down Expand Up @@ -145,12 +150,17 @@ public ZSpacingParameters getZSpacing() {
return zSpacing;
}

public MaskHackParameters getMaskHack() {
return maskHack;
}

/**
* @return a list of clients for each specified pipeline step.
*
* @throws IllegalArgumentException
* if no steps are defined or if any of the parameters are invalid.
*/

public List<AlignmentPipelineStep> buildStepClients()
throws IllegalArgumentException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.function.Supplier;

import org.janelia.render.client.spark.MipmapClient;
import org.janelia.render.client.spark.mask.MaskHackClient;
import org.janelia.render.client.spark.match.ClusterCountClient;
import org.janelia.render.client.spark.match.CopyMatchClient;
import org.janelia.render.client.spark.match.MultiStagePointMatchClient;
Expand All @@ -27,7 +28,8 @@ public enum AlignmentPipelineStepId {
FILTER_MATCHES(CopyMatchClient::new),
ALIGN_TILES(DistributedAffineBlockSolverClient::new),
CORRECT_Z_POSITIONS(ZPositionCorrectionClient::new),
CORRECT_INTENSITY(DistributedIntensityCorrectionBlockSolverClient::new);
CORRECT_INTENSITY(DistributedIntensityCorrectionBlockSolverClient::new),
HACK_MASK(MaskHackClient::new);

private final Supplier<AlignmentPipelineStep> stepClientSupplier;

Expand Down

0 comments on commit 71df1bc

Please sign in to comment.