From 71df1bcaf30139fe92be08c51668163cee53709b Mon Sep 17 00:00:00 2001 From: Eric Trautman Date: Sat, 20 Apr 2024 16:58:16 -0400 Subject: [PATCH] add mask hack client for multi-SEM --- .../client/parameter/MaskHackParameters.java | 67 +++++++ .../client/spark/mask/MaskHackClient.java | 185 ++++++++++++++++++ .../pipeline/AlignmentPipelineParameters.java | 12 +- .../pipeline/AlignmentPipelineStepId.java | 4 +- 4 files changed, 266 insertions(+), 2 deletions(-) create mode 100644 render-ws-java-client/src/main/java/org/janelia/render/client/parameter/MaskHackParameters.java create mode 100644 render-ws-spark-client/src/main/java/org/janelia/render/client/spark/mask/MaskHackClient.java diff --git a/render-ws-java-client/src/main/java/org/janelia/render/client/parameter/MaskHackParameters.java b/render-ws-java-client/src/main/java/org/janelia/render/client/parameter/MaskHackParameters.java new file mode 100644 index 000000000..70cb2a5d7 --- /dev/null +++ b/render-ws-java-client/src/main/java/org/janelia/render/client/parameter/MaskHackParameters.java @@ -0,0 +1,67 @@ +package org.janelia.render.client.parameter; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.beust.jcommander.ParametersDelegate; + +import java.io.Serializable; + +import org.janelia.alignment.json.JsonUtils; +import org.janelia.alignment.loader.DynamicMaskLoader; +import org.janelia.alignment.spec.stack.StackId; +import org.janelia.render.client.newsolver.setup.TargetStackParameters; + +// TODO: move/merge this into larger copy stack parameters class when the copy clients get updated + +/** + * Parameters for adding or updating masks. + * + * @author Eric Trautman + */ +@Parameters +public class MaskHackParameters + implements Serializable { + + @ParametersDelegate + public TargetStackParameters targetStack; + + @Parameter( + names = "--dynamicMaskValue", + description = "Value (without mask:// prefix) for all masks, e.g. " + + "outside-box?minX=0&minY=10&maxX=2000&maxY=1748&width=2000&height=1748") + public String dynamicMaskValue; + + public MaskHackParameters() { + } + + public void validate() + throws IllegalArgumentException { + + if (dynamicMaskValue == null) { + throw new IllegalArgumentException("must specify --dynamicMaskValue"); + } else { + DynamicMaskLoader.parseUrl(getDynamicMaskUrl()); // throws exception if mask value is invalid + } + } + + public String getDynamicMaskUrl() { + return dynamicMaskValue == null ? null : "mask://" + dynamicMaskValue; + } + + public MaskHackParameters buildPipelineClone(final StackId sourceStackId) { + final MaskHackParameters clone = this.clone(); + clone.targetStack.setValuesFromPipeline(sourceStackId, "_masked"); + return clone; + } + + /** (Slowly) creates a clone of this setup by serializing it to and from JSON. */ + @SuppressWarnings("MethodDoesntCallSuperMethod") + public MaskHackParameters clone() { + final String json = JSON_HELPER.toJson(this); + return JSON_HELPER.fromJson(json); + } + + private static final JsonUtils.Helper JSON_HELPER = + new JsonUtils.Helper<>(MaskHackParameters.class); + +} \ No newline at end of file diff --git a/render-ws-spark-client/src/main/java/org/janelia/render/client/spark/mask/MaskHackClient.java b/render-ws-spark-client/src/main/java/org/janelia/render/client/spark/mask/MaskHackClient.java new file mode 100644 index 000000000..8d2df52a4 --- /dev/null +++ b/render-ws-spark-client/src/main/java/org/janelia/render/client/spark/mask/MaskHackClient.java @@ -0,0 +1,185 @@ +package org.janelia.render.client.spark.mask; + +import com.beust.jcommander.ParametersDelegate; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.janelia.alignment.ImageAndMask; +import org.janelia.alignment.loader.ImageLoader; +import org.janelia.alignment.spec.ChannelSpec; +import org.janelia.alignment.spec.ResolvedTileSpecCollection; +import org.janelia.alignment.spec.stack.StackId; +import org.janelia.alignment.spec.stack.StackMetaData; +import org.janelia.alignment.spec.stack.StackWithZValues; +import org.janelia.render.client.ClientRunner; +import org.janelia.render.client.RenderDataClient; +import org.janelia.render.client.newsolver.setup.TargetStackParameters; +import org.janelia.render.client.parameter.CommandLineParameters; +import org.janelia.render.client.parameter.MaskHackParameters; +import org.janelia.render.client.parameter.MultiProjectParameters; +import org.janelia.render.client.spark.LogUtilities; +import org.janelia.render.client.spark.pipeline.AlignmentPipelineParameters; +import org.janelia.render.client.spark.pipeline.AlignmentPipelineStep; +import org.janelia.render.client.spark.pipeline.AlignmentPipelineStepId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO: merge mask option with the spark and java copy stack clients when there is more time + +/** + * Spark client for copying a stack and adding/updating a mask for all tiles in the copied stack. + * + * @author Eric Trautman + */ +public class MaskHackClient + implements Serializable, AlignmentPipelineStep { + + public static class Parameters extends CommandLineParameters { + @ParametersDelegate + public MultiProjectParameters multiProject; + + @ParametersDelegate + public MaskHackParameters maskHack; + + public Parameters() { + this(new MultiProjectParameters(), new MaskHackParameters()); + } + + public Parameters(final MultiProjectParameters multiProject, + final MaskHackParameters maskHack) { + this.multiProject = multiProject; + this.maskHack = maskHack; + } + } + + /** Run the client with command line parameters. */ + public static void main(final String[] args) { + final ClientRunner clientRunner = new ClientRunner(args) { + @Override + public void runClient(final String[] args) throws Exception { + final Parameters parameters = new Parameters(); + parameters.parse(args); + final MaskHackClient client = new MaskHackClient(); + client.createContextAndRun(parameters); + } + }; + clientRunner.run(); + } + + /** Empty constructor required for alignment pipeline steps. */ + public MaskHackClient() { + } + + /** Create a spark context and run the client with the specified parameters. */ + public void createContextAndRun(final Parameters clientParameters) throws IOException { + final SparkConf conf = new SparkConf().setAppName(getClass().getSimpleName()); + try (final JavaSparkContext sparkContext = new JavaSparkContext(conf)) { + LOG.info("createContextAndRun: appId is {}", sparkContext.getConf().getAppId()); + copyStackAndSetMasks(sparkContext, clientParameters); + } + } + + /** Validates the specified pipeline parameters are sufficient. */ + @Override + public void validatePipelineParameters(final AlignmentPipelineParameters pipelineParameters) + throws IllegalArgumentException { + AlignmentPipelineParameters.validateRequiredElementExists("mask", + pipelineParameters.getMaskHack()); + } + + /** Run the client as part of an alignment pipeline. */ + public void runPipelineStep(final JavaSparkContext sparkContext, + final AlignmentPipelineParameters pipelineParameters) + throws IllegalArgumentException, IOException { + // do not use naming groups because hacked masks need to be applied to different types of stacks + final Parameters clientParameters = new Parameters(pipelineParameters.getMultiProject(null), + pipelineParameters.getMaskHack()); + copyStackAndSetMasks(sparkContext, clientParameters); + } + + @Override + public AlignmentPipelineStepId getDefaultStepId() { + return AlignmentPipelineStepId.HACK_MASK; + } + + private void copyStackAndSetMasks(final JavaSparkContext sparkContext, + final Parameters clientParameters) + throws IOException { + + LOG.info("copyStackAndSetMasks: entry, clientParameters={}", clientParameters); + + final MultiProjectParameters multiProjectParameters = clientParameters.multiProject; + final String baseDataUrl = multiProjectParameters.getBaseDataUrl(); + final MaskHackParameters maskHack = clientParameters.maskHack; + final List stackWithZValuesList = multiProjectParameters.buildListOfStackWithAllZ(); + + final JavaRDD rddStackWithZValues = sparkContext.parallelize(stackWithZValuesList); + + final Integer zeroLevelKey = 0; + final String dynamicMaskUrl = maskHack.getDynamicMaskUrl(); + + final Function copyAndSetMaskFunction = stackWithZ -> { + + LogUtilities.setupExecutorLog4j(stackWithZ.toString()); + + final StackId stackId = stackWithZ.getStackId(); + final RenderDataClient sourceDataClient = new RenderDataClient(baseDataUrl, + stackId.getOwner(), + stackId.getProject()); + final TargetStackParameters target = maskHack.buildPipelineClone(stackId).targetStack; + + final RenderDataClient targetDataClient = new RenderDataClient(baseDataUrl, + target.owner, + target.project); + + final StackMetaData sourceMetaData = sourceDataClient.getStackMetaData(stackId.getStack()); + targetDataClient.setupDerivedStack(sourceMetaData, target.stack); + + for (final Double z : stackWithZ.getzValues()) { + + final ResolvedTileSpecCollection resolvedTiles = sourceDataClient.getResolvedTiles(stackId.getStack(), + z); + resolvedTiles.getTileSpecs().forEach(tileSpec -> { + + for (final ChannelSpec channelSpec : tileSpec.getAllChannels()) { + + final Map.Entry entry = channelSpec.getFirstMipmapEntry(); + + if ((entry != null) && zeroLevelKey.equals(entry.getKey())) { + final ImageAndMask sourceImageAndMask = entry.getValue(); + final ImageAndMask updatedImageAndMask = + sourceImageAndMask.copyWithMask(dynamicMaskUrl, + ImageLoader.LoaderType.DYNAMIC_MASK, + null); + channelSpec.putMipmap(zeroLevelKey, updatedImageAndMask); + } + } + + }); + + targetDataClient.saveResolvedTiles(resolvedTiles, target.stack, z); + } + + if (target.completeStack) { + targetDataClient.setStackState(target.stack, StackMetaData.StackState.COMPLETE); + } + + return null; + }; + + final JavaRDD rddCopyAndSetMask = rddStackWithZValues.map(copyAndSetMaskFunction); + rddCopyAndSetMask.collect(); + + LOG.info("copyStackAndSetMasks: collected rddCopyAndSetMask"); + LOG.info("copyStackAndSetMasks: exit"); + } + + private static final Logger LOG = LoggerFactory.getLogger(MaskHackClient.class); +} diff --git a/render-ws-spark-client/src/main/java/org/janelia/render/client/spark/pipeline/AlignmentPipelineParameters.java b/render-ws-spark-client/src/main/java/org/janelia/render/client/spark/pipeline/AlignmentPipelineParameters.java index e04d013c4..81d567f54 100644 --- a/render-ws-spark-client/src/main/java/org/janelia/render/client/spark/pipeline/AlignmentPipelineParameters.java +++ b/render-ws-spark-client/src/main/java/org/janelia/render/client/spark/pipeline/AlignmentPipelineParameters.java @@ -15,6 +15,7 @@ import org.janelia.render.client.newsolver.setup.AffineBlockSolverSetup; import org.janelia.render.client.newsolver.setup.IntensityCorrectionSetup; import org.janelia.render.client.parameter.MFOVMontageMatchPatchParameters; +import org.janelia.render.client.parameter.MaskHackParameters; import org.janelia.render.client.parameter.MatchCopyParameters; import org.janelia.render.client.parameter.MipmapParameters; import org.janelia.render.client.parameter.MultiProjectParameters; @@ -45,6 +46,7 @@ public class AlignmentPipelineParameters private final AffineBlockSolverSetup affineBlockSolverSetup; private final IntensityCorrectionSetup intensityCorrectionSetup; private final ZSpacingParameters zSpacing; + private final MaskHackParameters maskHack; @SuppressWarnings("unused") public AlignmentPipelineParameters() { @@ -59,6 +61,7 @@ public AlignmentPipelineParameters() { null, null, null, + null, null); } @@ -73,7 +76,8 @@ public AlignmentPipelineParameters(final MultiProjectParameters multiProject, final MatchCopyParameters matchCopy, final AffineBlockSolverSetup affineBlockSolverSetup, final IntensityCorrectionSetup intensityCorrectionSetup, - final ZSpacingParameters zSpacing) { + final ZSpacingParameters zSpacing, + final MaskHackParameters maskHack) { this.multiProject = multiProject; this.pipelineStackGroups = pipelineStackGroups; this.pipelineSteps = pipelineSteps; @@ -86,6 +90,7 @@ public AlignmentPipelineParameters(final MultiProjectParameters multiProject, this.affineBlockSolverSetup = affineBlockSolverSetup; this.intensityCorrectionSetup = intensityCorrectionSetup; this.zSpacing = zSpacing; + this.maskHack = maskHack; } public MultiProjectParameters getMultiProject(final StackIdNamingGroup withNamingGroup) { @@ -145,12 +150,17 @@ public ZSpacingParameters getZSpacing() { return zSpacing; } + public MaskHackParameters getMaskHack() { + return maskHack; + } + /** * @return a list of clients for each specified pipeline step. * * @throws IllegalArgumentException * if no steps are defined or if any of the parameters are invalid. */ + public List buildStepClients() throws IllegalArgumentException { diff --git a/render-ws-spark-client/src/main/java/org/janelia/render/client/spark/pipeline/AlignmentPipelineStepId.java b/render-ws-spark-client/src/main/java/org/janelia/render/client/spark/pipeline/AlignmentPipelineStepId.java index 4d7aadd99..2831c14b2 100644 --- a/render-ws-spark-client/src/main/java/org/janelia/render/client/spark/pipeline/AlignmentPipelineStepId.java +++ b/render-ws-spark-client/src/main/java/org/janelia/render/client/spark/pipeline/AlignmentPipelineStepId.java @@ -3,6 +3,7 @@ import java.util.function.Supplier; import org.janelia.render.client.spark.MipmapClient; +import org.janelia.render.client.spark.mask.MaskHackClient; import org.janelia.render.client.spark.match.ClusterCountClient; import org.janelia.render.client.spark.match.CopyMatchClient; import org.janelia.render.client.spark.match.MultiStagePointMatchClient; @@ -27,7 +28,8 @@ public enum AlignmentPipelineStepId { FILTER_MATCHES(CopyMatchClient::new), ALIGN_TILES(DistributedAffineBlockSolverClient::new), CORRECT_Z_POSITIONS(ZPositionCorrectionClient::new), - CORRECT_INTENSITY(DistributedIntensityCorrectionBlockSolverClient::new); + CORRECT_INTENSITY(DistributedIntensityCorrectionBlockSolverClient::new), + HACK_MASK(MaskHackClient::new); private final Supplier stepClientSupplier;