From 714a04e296bccdc74ec9aa4b70fef04ede63dfbe Mon Sep 17 00:00:00 2001 From: Stephan Preibisch Date: Tue, 29 Oct 2024 08:15:10 -0400 Subject: [PATCH] introduced an abstraction layer for all N5 dataset parameters and to potentially expose parts of the image (5D -> 3D for OME-ZARR), this allows us to directly use the N5ImageLoader for example for reading OME-ZARR based datasets. It also supports pre-fetching of all dataset attributes, which can slow down cloud processing if not done. I also added an instantiateN5Reader() method. --- .../java/bdv/export/n5/WriteSequenceToN5.java | 5 +- src/main/java/bdv/img/n5/BdvN5Format.java | 59 ++++++- src/main/java/bdv/img/n5/N5ImageLoader.java | 148 +++++++++++++++--- src/main/java/bdv/img/n5/N5Properties.java | 24 +++ 4 files changed, 214 insertions(+), 22 deletions(-) create mode 100644 src/main/java/bdv/img/n5/N5Properties.java diff --git a/src/main/java/bdv/export/n5/WriteSequenceToN5.java b/src/main/java/bdv/export/n5/WriteSequenceToN5.java index fb838439..5c28c0eb 100644 --- a/src/main/java/bdv/export/n5/WriteSequenceToN5.java +++ b/src/main/java/bdv/export/n5/WriteSequenceToN5.java @@ -59,7 +59,9 @@ import bdv.export.ProgressWriterNull; import bdv.export.SubTaskProgressWriter; import bdv.img.cache.SimpleCacheArrayLoader; +import bdv.img.n5.BdvN5Format; import bdv.img.n5.N5ImageLoader; +import bdv.img.n5.N5Properties; import mpicbg.spim.data.generic.sequence.AbstractSequenceDescription; import mpicbg.spim.data.generic.sequence.BasicImgLoader; import mpicbg.spim.data.generic.sequence.BasicSetupImgLoader; @@ -274,6 +276,7 @@ static class N5DatasetIO< T extends RealType< T > & NativeType< T > > implements private final int timepointId; private final DataType dataType; private final T type; + private final N5Properties n5Properties = new BdvN5Format(); public N5DatasetIO( final N5Writer n5, final Compression compression, final int setupId, final int timepointId, final T type ) { @@ -335,7 +338,7 @@ public RandomAccessibleInterval< T > getImage( final int level ) throws IOExcept final long[] dimensions = attributes.getDimensions(); final int[] cellDimensions = attributes.getBlockSize(); final CellGrid grid = new CellGrid( dimensions, cellDimensions ); - final SimpleCacheArrayLoader< ? > cacheArrayLoader = N5ImageLoader.createCacheArrayLoader( n5, pathName ); + final SimpleCacheArrayLoader< ? > cacheArrayLoader = N5ImageLoader.createCacheArrayLoader( n5Properties, n5, pathName ); return new ReadOnlyCachedCellImgFactory().createWithCacheLoader( dimensions, type, key -> { diff --git a/src/main/java/bdv/img/n5/BdvN5Format.java b/src/main/java/bdv/img/n5/BdvN5Format.java index 88bc8cf3..64456218 100644 --- a/src/main/java/bdv/img/n5/BdvN5Format.java +++ b/src/main/java/bdv/img/n5/BdvN5Format.java @@ -28,11 +28,68 @@ */ package bdv.img.n5; -public class BdvN5Format +import org.janelia.saalfeldlab.n5.DataType; +import org.janelia.saalfeldlab.n5.DatasetAttributes; +import org.janelia.saalfeldlab.n5.N5Reader; + +import bdv.img.cache.VolatileCachedCellImg; +import net.imglib2.RandomAccessibleInterval; +import net.imglib2.type.NativeType; + +public class BdvN5Format implements N5Properties { public static final String DOWNSAMPLING_FACTORS_KEY = "downsamplingFactors"; public static final String DATA_TYPE_KEY = "dataType"; + @Override + public String getPath( final int setupId ) + { + return getPathName( setupId ); + } + + @Override + public String getPath( final int setupId, final int timepointId) + { + return getPathName( setupId, timepointId ); + } + + @Override + public String getPath( final int setupId, final int timepointId, final int level) + { + return getPathName( setupId, timepointId, level ); + } + + @Override + public DataType getDataType( final N5Reader n5, final int setupId ) + { + // optionally cached as defined by N5ImageLoader.preFetchDatasetAttributes + return n5.getAttribute( getPath( setupId ), DATA_TYPE_KEY, DataType.class ); + } + + @Override + public double[][] getMipmapResolutions( final N5Reader n5, final int setupId ) + { + // optionally cached as defined by N5ImageLoader.preFetchDatasetAttributes + return n5.getAttribute( getPath( setupId ), DOWNSAMPLING_FACTORS_KEY, double[][].class ); + } + + @Override + public DatasetAttributes getDatasetAttributes( final N5Reader n5, final String pathName ) + { + // optionally cached as defined by N5ImageLoader.preFetchDatasetAttributes + return n5.getDatasetAttributes( pathName ); + } + + @Override + public > RandomAccessibleInterval extractImg( + final VolatileCachedCellImg img, + final int setupId, + final int timepointId) + { + return img; + } + + // left the old code for compatibility public static String getPathName( final int setupId ) { return String.format( "setup%d", setupId ); diff --git a/src/main/java/bdv/img/n5/N5ImageLoader.java b/src/main/java/bdv/img/n5/N5ImageLoader.java index e4ce3873..d797f9df 100644 --- a/src/main/java/bdv/img/n5/N5ImageLoader.java +++ b/src/main/java/bdv/img/n5/N5ImageLoader.java @@ -28,19 +28,20 @@ */ package bdv.img.n5; -import static bdv.img.n5.BdvN5Format.DATA_TYPE_KEY; -import static bdv.img.n5.BdvN5Format.DOWNSAMPLING_FACTORS_KEY; -import static bdv.img.n5.BdvN5Format.getPathName; - import java.io.File; import java.io.IOException; import java.net.URI; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ForkJoinPool; import java.util.function.Function; import java.util.function.IntFunction; +import java.util.regex.Pattern; +import java.util.stream.IntStream; import org.janelia.saalfeldlab.n5.DataBlock; import org.janelia.saalfeldlab.n5.DataType; @@ -54,6 +55,7 @@ import bdv.cache.CacheControl; import bdv.cache.SharedQueue; import bdv.img.cache.SimpleCacheArrayLoader; +import bdv.img.cache.VolatileCachedCellImg; import bdv.img.cache.VolatileGlobalCellCache; import bdv.util.ConstantRandomAccessible; import bdv.util.MipmapTransforms; @@ -62,6 +64,9 @@ import mpicbg.spim.data.generic.sequence.ImgLoaderHint; import mpicbg.spim.data.sequence.MultiResolutionImgLoader; import mpicbg.spim.data.sequence.MultiResolutionSetupImgLoader; +import mpicbg.spim.data.sequence.TimePoint; +import mpicbg.spim.data.sequence.ViewId; +import mpicbg.spim.data.sequence.ViewSetup; import mpicbg.spim.data.sequence.VoxelDimensions; import net.imglib2.Dimensions; import net.imglib2.FinalDimensions; @@ -78,15 +83,24 @@ import net.imglib2.type.NativeType; import net.imglib2.util.Cast; import net.imglib2.util.Intervals; +import net.imglib2.util.Pair; +import net.imglib2.util.ValuePair; import net.imglib2.view.Views; public class N5ImageLoader implements ViewerImgLoader, MultiResolutionImgLoader { + private final static Pattern FILE_SCHEME = Pattern.compile( "file", Pattern.CASE_INSENSITIVE ); + + // TODO: there are cases where one does not want to pre-fetch like distributed processing (or local filesystem?) + public static boolean preFetchDatasetAttributes = true; + public static int cloudThreads = 256; + private final URI n5URI; + protected final N5Properties n5properties; // TODO: it would be good if this would not be needed // find available setups from the n5 - private final AbstractSequenceDescription< ?, ?, ? > seq; + protected final AbstractSequenceDescription< ?, ?, ? > seq; /** * Maps setup id to {@link SetupImgLoader}. @@ -97,6 +111,7 @@ public N5ImageLoader( final URI n5URI, final AbstractSequenceDescription< ?, ?, { this.n5URI = n5URI; this.seq = sequenceDescription; + this.n5properties = createN5PropertiesInstance(); } public N5ImageLoader( final File n5File, final AbstractSequenceDescription< ?, ?, ? > sequenceDescription ) @@ -108,6 +123,10 @@ public N5ImageLoader( final N5Reader n5Reader, final URI n5URI, final AbstractSe { this( n5URI, sequenceDescription ); n5 = n5Reader; + + // TODO: if we get something that's not a file and not relative set a different default for numFetcherThreads + if ( n5URI.getScheme() != null && !FILE_SCHEME.asPredicate().test( n5URI.getScheme() ) ) + setNumFetcherThreads( cloudThreads ); } public URI getN5URI() @@ -120,11 +139,84 @@ public File getN5File() return new File( n5URI ); } + public N5Properties getN5properties() + { + return n5properties; + } + + /** + * can be overridden by subclasses (rather than only having the option to provide an instance at construction time) + * + * @return a new N5 Reader instance + */ + public N5Reader instantiateN5Reader() + { + return new N5FSReader( getN5File().getAbsolutePath() ); + } + + /** + * this is only called from a synchronized open() context + */ + public void preFetch() + { + try + { + // touch all metadata in advance in parallel so the N5-API caches them + final ForkJoinPool myPool = new ForkJoinPool( cloudThreads ); + + // prefetch all datatypes and MipmapResolutions + final Collection vss = seq.getViewSetups().values(); + + myPool.submit(() -> vss.parallelStream().parallel().forEach( setup -> { + n5properties.getDataType( n5, setup.getId() ); + n5properties.getMipmapResolutions( n5, setup.getId() ); + })).join(); + + System.out.println( "Pre-fetched " + vss.size() + " data types and mipmap resolutions."); + + // prefetch all DatasetAttributes for all views + // assemble all tasks first to not having to do nested parallel streams + final ArrayList> attribTasks = new ArrayList<>(); + + final Collection< TimePoint > tps = seq.getTimePoints().getTimePoints().values(); + + tps.stream() + .forEach( tp -> vss.stream() // for each TimePoint stream all ViewSetups + .map( vs -> new ViewId( tp.getId(), vs.getId() ) ) // each TimePoint x ViewSetup -> ViewId + .filter( viewId -> seq.getMissingViews() == null || !seq.getMissingViews().getMissingViews().contains( viewId ) ) // filter missing ViewIds + .forEach( viewId -> IntStream.range( 0, n5properties.getMipmapResolutions( n5, viewId.getViewSetupId() ).length ) // num MipmapResolutions of ViewId + .forEach( s -> attribTasks.add( new ValuePair<>( viewId, s ) ) ) ) ); // each ViewId x MipmapResolution is a task + + myPool.submit( () -> attribTasks.parallelStream().parallel().forEach( pair -> + n5.getDatasetAttributes( + n5properties.getPath( + pair.getA().getViewSetupId(), + pair.getA().getTimePointId(), + pair.getB() ) ) + )).join(); + + myPool.shutdown(); + + System.out.println( "Pre-fetched " + attribTasks.size() + " dataset attributes."); + } + catch ( Exception e ) + { + // TODO: no drama if this fails ... could be that some data is actually missing, one can still look at what's there + System.out.println( "Prefetching attributes failed: " + e); + } + } + + /** + * @return a class that creates the pathnames for the setupId, timePointId and multiresolution levels + */ + public N5Properties createN5PropertiesInstance() { return new BdvN5Format(); } + private volatile boolean isOpen = false; + private volatile boolean isPrefetched = false; private SharedQueue createdSharedQueue; private VolatileGlobalCellCache cache; - private N5Reader n5; - + protected N5Reader n5; + private boolean preFetch = preFetchDatasetAttributes; private int requestedNumFetcherThreads = -1; private SharedQueue requestedSharedQueue; @@ -141,6 +233,11 @@ public void setCreatedSharedQueue( final SharedQueue createdSharedQueue ) requestedSharedQueue = createdSharedQueue; } + public void setPrefetchDatasetAttributes( final boolean preFetch ) { this.preFetch = preFetch; } + public boolean getPrefetchDatasetAttributes() { return this.preFetch; } + + protected boolean isPrefetched() { return isPrefetched; } + private void open() { if ( !isOpen ) @@ -154,7 +251,14 @@ private void open() { if ( n5 == null ) { - n5 = new N5FSReader( getN5File().getAbsolutePath() ); + n5 = instantiateN5Reader(); + } + + if ( !isPrefetched && preFetchDatasetAttributes ) + { + // TODO: pre-fetching in the open() method once + preFetch(); + isPrefetched = true; } int maxNumLevels = 0; @@ -185,6 +289,7 @@ private void open() } } + // TODO: the N5 is not re-opened /** * Clear the cache. Images that were obtained from * this loader before {@link #close()} will stop working. Requesting images @@ -206,6 +311,9 @@ public void close() createdSharedQueue = null; isOpen = false; + + // TODO: I guess we should set this to false as well + isPrefetched = false; } } } @@ -219,11 +327,10 @@ public void close() private < T extends NativeType< T >, V extends Volatile< T > & NativeType< V > > SetupImgLoader< T, V > createSetupImgLoader( final int setupId ) throws IOException { - final String pathName = getPathName( setupId ); final DataType dataType; try { - dataType = n5.getAttribute( pathName, DATA_TYPE_KEY, DataType.class ); + dataType = n5properties.getDataType( n5, setupId ); } catch ( final N5Exception e ) { @@ -258,10 +365,9 @@ public SetupImgLoader( final int setupId, final T type, final V volatileType ) t { super( type, volatileType ); this.setupId = setupId; - final String pathName = getPathName( setupId ); try { - mipmapResolutions = n5.getAttribute( pathName, DOWNSAMPLING_FACTORS_KEY, double[][].class ); + mipmapResolutions = n5properties.getMipmapResolutions( n5, setupId ); } catch ( final N5Exception e ) { @@ -289,8 +395,8 @@ public Dimensions getImageSize( final int timepointId, final int level ) { try { - final String pathName = getPathName( setupId, timepointId, level ); - final DatasetAttributes attributes = n5.getDatasetAttributes( pathName ); + final String pathName = n5properties.getPath( setupId, timepointId, level ); + final DatasetAttributes attributes = n5properties.getDatasetAttributes( n5, pathName ); return new FinalDimensions( attributes.getDimensions() ); } catch( final RuntimeException e ) @@ -330,8 +436,8 @@ private < T extends NativeType< T > > RandomAccessibleInterval< T > prepareCache { try { - final String pathName = getPathName( setupId, timepointId, level ); - final DatasetAttributes attributes = n5.getDatasetAttributes( pathName ); + final String pathName = n5properties.getPath( setupId, timepointId, level ); + final DatasetAttributes attributes = n5properties.getDatasetAttributes( n5, pathName ); final long[] dimensions = attributes.getDimensions(); final int[] cellDimensions = attributes.getBlockSize(); final CellGrid grid = new CellGrid( dimensions, cellDimensions ); @@ -339,8 +445,10 @@ private < T extends NativeType< T > > RandomAccessibleInterval< T > prepareCache final int priority = numMipmapLevels() - 1 - level; final CacheHints cacheHints = new CacheHints( loadingStrategy, priority, false ); - final SimpleCacheArrayLoader< ? > loader = createCacheArrayLoader( n5, pathName ); - return cache.createImg( grid, timepointId, setupId, level, cacheHints, loader, type ); + final SimpleCacheArrayLoader< ? > loader = createCacheArrayLoader( n5properties, n5, pathName ); + final VolatileCachedCellImg img = cache.createImg( grid, timepointId, setupId, level, cacheHints, loader, type ); + + return n5properties.extractImg( img, setupId, timepointId ); } catch ( final IOException | N5Exception e ) { @@ -417,12 +525,12 @@ public A loadArray( final long[] gridPosition, final int[] cellDimensions ) thro } } - public static SimpleCacheArrayLoader< ? > createCacheArrayLoader( final N5Reader n5, final String pathName ) throws IOException + public static SimpleCacheArrayLoader< ? > createCacheArrayLoader( final N5Properties n5Properties, final N5Reader n5, final String pathName ) throws IOException { final DatasetAttributes attributes; try { - attributes = n5.getDatasetAttributes( pathName ); + attributes = n5Properties.getDatasetAttributes( n5, pathName ); } catch ( final N5Exception e ) { diff --git a/src/main/java/bdv/img/n5/N5Properties.java b/src/main/java/bdv/img/n5/N5Properties.java new file mode 100644 index 00000000..fef7058f --- /dev/null +++ b/src/main/java/bdv/img/n5/N5Properties.java @@ -0,0 +1,24 @@ +package bdv.img.n5; + +import org.janelia.saalfeldlab.n5.DataType; +import org.janelia.saalfeldlab.n5.DatasetAttributes; +import org.janelia.saalfeldlab.n5.N5Reader; + +import bdv.img.cache.VolatileCachedCellImg; +import net.imglib2.RandomAccessibleInterval; +import net.imglib2.type.NativeType; + +public interface N5Properties +{ + // in case of OME-ZARR, it has an underlying 5D container + public < T extends NativeType > RandomAccessibleInterval extractImg( final VolatileCachedCellImg img, final int setupId, final int timepointId ); + + // give the option to pre-fetch the attributes or store them in the XML + public DatasetAttributes getDatasetAttributes( final N5Reader n5, final String pathName ); + + public DataType getDataType( final N5Reader n5, final int setupId ); + public double[][] getMipmapResolutions( final N5Reader n5, final int setupId ); + public String getPath( final int setupId ); + public String getPath( final int setupId, final int timepointId ); + public String getPath( final int setupId, final int timepointId, final int level ); +}