Skip to content

Commit

Permalink
introduced an abstraction layer for all N5 dataset parameters and to …
Browse files Browse the repository at this point in the history
…potentially expose parts of the image (5D -> 3D for OME-ZARR), this allows us to directly use the N5ImageLoader for example for reading OME-ZARR based datasets. It also supports pre-fetching of all dataset attributes, which can slow down cloud processing if not done.

I also added an instantiateN5Reader() method.
  • Loading branch information
StephanPreibisch committed Oct 31, 2024
1 parent 4d308f7 commit 714a04e
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 22 deletions.
5 changes: 4 additions & 1 deletion src/main/java/bdv/export/n5/WriteSequenceToN5.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@
import bdv.export.ProgressWriterNull;
import bdv.export.SubTaskProgressWriter;
import bdv.img.cache.SimpleCacheArrayLoader;
import bdv.img.n5.BdvN5Format;
import bdv.img.n5.N5ImageLoader;
import bdv.img.n5.N5Properties;
import mpicbg.spim.data.generic.sequence.AbstractSequenceDescription;
import mpicbg.spim.data.generic.sequence.BasicImgLoader;
import mpicbg.spim.data.generic.sequence.BasicSetupImgLoader;
Expand Down Expand Up @@ -274,6 +276,7 @@ static class N5DatasetIO< T extends RealType< T > & NativeType< T > > implements
private final int timepointId;
private final DataType dataType;
private final T type;
private final N5Properties n5Properties = new BdvN5Format();

public N5DatasetIO( final N5Writer n5, final Compression compression, final int setupId, final int timepointId, final T type )
{
Expand Down Expand Up @@ -335,7 +338,7 @@ public RandomAccessibleInterval< T > getImage( final int level ) throws IOExcept
final long[] dimensions = attributes.getDimensions();
final int[] cellDimensions = attributes.getBlockSize();
final CellGrid grid = new CellGrid( dimensions, cellDimensions );
final SimpleCacheArrayLoader< ? > cacheArrayLoader = N5ImageLoader.createCacheArrayLoader( n5, pathName );
final SimpleCacheArrayLoader< ? > cacheArrayLoader = N5ImageLoader.createCacheArrayLoader( n5Properties, n5, pathName );
return new ReadOnlyCachedCellImgFactory().createWithCacheLoader(
dimensions, type,
key -> {
Expand Down
59 changes: 58 additions & 1 deletion src/main/java/bdv/img/n5/BdvN5Format.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,68 @@
*/
package bdv.img.n5;

public class BdvN5Format
import org.janelia.saalfeldlab.n5.DataType;
import org.janelia.saalfeldlab.n5.DatasetAttributes;
import org.janelia.saalfeldlab.n5.N5Reader;

import bdv.img.cache.VolatileCachedCellImg;
import net.imglib2.RandomAccessibleInterval;
import net.imglib2.type.NativeType;

public class BdvN5Format implements N5Properties
{
public static final String DOWNSAMPLING_FACTORS_KEY = "downsamplingFactors";
public static final String DATA_TYPE_KEY = "dataType";

@Override
public String getPath( final int setupId )
{
return getPathName( setupId );
}

@Override
public String getPath( final int setupId, final int timepointId)
{
return getPathName( setupId, timepointId );
}

@Override
public String getPath( final int setupId, final int timepointId, final int level)
{
return getPathName( setupId, timepointId, level );
}

@Override
public DataType getDataType( final N5Reader n5, final int setupId )
{
// optionally cached as defined by N5ImageLoader.preFetchDatasetAttributes
return n5.getAttribute( getPath( setupId ), DATA_TYPE_KEY, DataType.class );
}

@Override
public double[][] getMipmapResolutions( final N5Reader n5, final int setupId )
{
// optionally cached as defined by N5ImageLoader.preFetchDatasetAttributes
return n5.getAttribute( getPath( setupId ), DOWNSAMPLING_FACTORS_KEY, double[][].class );
}

@Override
public DatasetAttributes getDatasetAttributes( final N5Reader n5, final String pathName )
{
// optionally cached as defined by N5ImageLoader.preFetchDatasetAttributes
return n5.getDatasetAttributes( pathName );
}

@Override
public <T extends NativeType<T>> RandomAccessibleInterval<T> extractImg(
final VolatileCachedCellImg<T, ?> img,
final int setupId,
final int timepointId)
{
return img;
}

// left the old code for compatibility
public static String getPathName( final int setupId )
{
return String.format( "setup%d", setupId );
Expand Down
148 changes: 128 additions & 20 deletions src/main/java/bdv/img/n5/N5ImageLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,20 @@
*/
package bdv.img.n5;

import static bdv.img.n5.BdvN5Format.DATA_TYPE_KEY;
import static bdv.img.n5.BdvN5Format.DOWNSAMPLING_FACTORS_KEY;
import static bdv.img.n5.BdvN5Format.getPathName;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.regex.Pattern;
import java.util.stream.IntStream;

import org.janelia.saalfeldlab.n5.DataBlock;
import org.janelia.saalfeldlab.n5.DataType;
Expand All @@ -54,6 +55,7 @@
import bdv.cache.CacheControl;
import bdv.cache.SharedQueue;
import bdv.img.cache.SimpleCacheArrayLoader;
import bdv.img.cache.VolatileCachedCellImg;
import bdv.img.cache.VolatileGlobalCellCache;
import bdv.util.ConstantRandomAccessible;
import bdv.util.MipmapTransforms;
Expand All @@ -62,6 +64,9 @@
import mpicbg.spim.data.generic.sequence.ImgLoaderHint;
import mpicbg.spim.data.sequence.MultiResolutionImgLoader;
import mpicbg.spim.data.sequence.MultiResolutionSetupImgLoader;
import mpicbg.spim.data.sequence.TimePoint;
import mpicbg.spim.data.sequence.ViewId;
import mpicbg.spim.data.sequence.ViewSetup;
import mpicbg.spim.data.sequence.VoxelDimensions;
import net.imglib2.Dimensions;
import net.imglib2.FinalDimensions;
Expand All @@ -78,15 +83,24 @@
import net.imglib2.type.NativeType;
import net.imglib2.util.Cast;
import net.imglib2.util.Intervals;
import net.imglib2.util.Pair;
import net.imglib2.util.ValuePair;
import net.imglib2.view.Views;

public class N5ImageLoader implements ViewerImgLoader, MultiResolutionImgLoader
{
private final static Pattern FILE_SCHEME = Pattern.compile( "file", Pattern.CASE_INSENSITIVE );

// TODO: there are cases where one does not want to pre-fetch like distributed processing (or local filesystem?)
public static boolean preFetchDatasetAttributes = true;
public static int cloudThreads = 256;

private final URI n5URI;
protected final N5Properties n5properties;

// TODO: it would be good if this would not be needed
// find available setups from the n5
private final AbstractSequenceDescription< ?, ?, ? > seq;
protected final AbstractSequenceDescription< ?, ?, ? > seq;

/**
* Maps setup id to {@link SetupImgLoader}.
Expand All @@ -97,6 +111,7 @@ public N5ImageLoader( final URI n5URI, final AbstractSequenceDescription< ?, ?,
{
this.n5URI = n5URI;
this.seq = sequenceDescription;
this.n5properties = createN5PropertiesInstance();
}

public N5ImageLoader( final File n5File, final AbstractSequenceDescription< ?, ?, ? > sequenceDescription )
Expand All @@ -108,6 +123,10 @@ public N5ImageLoader( final N5Reader n5Reader, final URI n5URI, final AbstractSe
{
this( n5URI, sequenceDescription );
n5 = n5Reader;

// TODO: if we get something that's not a file and not relative set a different default for numFetcherThreads
if ( n5URI.getScheme() != null && !FILE_SCHEME.asPredicate().test( n5URI.getScheme() ) )
setNumFetcherThreads( cloudThreads );
}

public URI getN5URI()
Expand All @@ -120,11 +139,84 @@ public File getN5File()
return new File( n5URI );
}

public N5Properties getN5properties()
{
return n5properties;
}

/**
* can be overridden by subclasses (rather than only having the option to provide an instance at construction time)
*
* @return a new N5 Reader instance
*/
public N5Reader instantiateN5Reader()
{
return new N5FSReader( getN5File().getAbsolutePath() );
}

/**
* this is only called from a synchronized open() context
*/
public void preFetch()
{
try
{
// touch all metadata in advance in parallel so the N5-API caches them
final ForkJoinPool myPool = new ForkJoinPool( cloudThreads );

// prefetch all datatypes and MipmapResolutions
final Collection<? extends BasicViewSetup > vss = seq.getViewSetups().values();

myPool.submit(() -> vss.parallelStream().parallel().forEach( setup -> {
n5properties.getDataType( n5, setup.getId() );
n5properties.getMipmapResolutions( n5, setup.getId() );
})).join();

System.out.println( "Pre-fetched " + vss.size() + " data types and mipmap resolutions.");

// prefetch all DatasetAttributes for all views
// assemble all tasks first to not having to do nested parallel streams
final ArrayList<Pair<ViewId, Integer>> attribTasks = new ArrayList<>();

final Collection< TimePoint > tps = seq.getTimePoints().getTimePoints().values();

tps.stream()
.forEach( tp -> vss.stream() // for each TimePoint stream all ViewSetups
.map( vs -> new ViewId( tp.getId(), vs.getId() ) ) // each TimePoint x ViewSetup -> ViewId
.filter( viewId -> seq.getMissingViews() == null || !seq.getMissingViews().getMissingViews().contains( viewId ) ) // filter missing ViewIds
.forEach( viewId -> IntStream.range( 0, n5properties.getMipmapResolutions( n5, viewId.getViewSetupId() ).length ) // num MipmapResolutions of ViewId
.forEach( s -> attribTasks.add( new ValuePair<>( viewId, s ) ) ) ) ); // each ViewId x MipmapResolution is a task

myPool.submit( () -> attribTasks.parallelStream().parallel().forEach( pair ->
n5.getDatasetAttributes(
n5properties.getPath(
pair.getA().getViewSetupId(),
pair.getA().getTimePointId(),
pair.getB() ) )
)).join();

myPool.shutdown();

System.out.println( "Pre-fetched " + attribTasks.size() + " dataset attributes.");
}
catch ( Exception e )
{
// TODO: no drama if this fails ... could be that some data is actually missing, one can still look at what's there
System.out.println( "Prefetching attributes failed: " + e);
}
}

/**
* @return a class that creates the pathnames for the setupId, timePointId and multiresolution levels
*/
public N5Properties createN5PropertiesInstance() { return new BdvN5Format(); }

private volatile boolean isOpen = false;
private volatile boolean isPrefetched = false;
private SharedQueue createdSharedQueue;
private VolatileGlobalCellCache cache;
private N5Reader n5;

protected N5Reader n5;
private boolean preFetch = preFetchDatasetAttributes;

private int requestedNumFetcherThreads = -1;
private SharedQueue requestedSharedQueue;
Expand All @@ -141,6 +233,11 @@ public void setCreatedSharedQueue( final SharedQueue createdSharedQueue )
requestedSharedQueue = createdSharedQueue;
}

public void setPrefetchDatasetAttributes( final boolean preFetch ) { this.preFetch = preFetch; }
public boolean getPrefetchDatasetAttributes() { return this.preFetch; }

protected boolean isPrefetched() { return isPrefetched; }

private void open()
{
if ( !isOpen )
Expand All @@ -154,7 +251,14 @@ private void open()
{
if ( n5 == null )
{
n5 = new N5FSReader( getN5File().getAbsolutePath() );
n5 = instantiateN5Reader();
}

if ( !isPrefetched && preFetchDatasetAttributes )
{
// TODO: pre-fetching in the open() method once
preFetch();
isPrefetched = true;
}

int maxNumLevels = 0;
Expand Down Expand Up @@ -185,6 +289,7 @@ private void open()
}
}

// TODO: the N5 is not re-opened
/**
* Clear the cache. Images that were obtained from
* this loader before {@link #close()} will stop working. Requesting images
Expand All @@ -206,6 +311,9 @@ public void close()

createdSharedQueue = null;
isOpen = false;

// TODO: I guess we should set this to false as well
isPrefetched = false;
}
}
}
Expand All @@ -219,11 +327,10 @@ public void close()

private < T extends NativeType< T >, V extends Volatile< T > & NativeType< V > > SetupImgLoader< T, V > createSetupImgLoader( final int setupId ) throws IOException
{
final String pathName = getPathName( setupId );
final DataType dataType;
try
{
dataType = n5.getAttribute( pathName, DATA_TYPE_KEY, DataType.class );
dataType = n5properties.getDataType( n5, setupId );
}
catch ( final N5Exception e )
{
Expand Down Expand Up @@ -258,10 +365,9 @@ public SetupImgLoader( final int setupId, final T type, final V volatileType ) t
{
super( type, volatileType );
this.setupId = setupId;
final String pathName = getPathName( setupId );
try
{
mipmapResolutions = n5.getAttribute( pathName, DOWNSAMPLING_FACTORS_KEY, double[][].class );
mipmapResolutions = n5properties.getMipmapResolutions( n5, setupId );
}
catch ( final N5Exception e )
{
Expand Down Expand Up @@ -289,8 +395,8 @@ public Dimensions getImageSize( final int timepointId, final int level )
{
try
{
final String pathName = getPathName( setupId, timepointId, level );
final DatasetAttributes attributes = n5.getDatasetAttributes( pathName );
final String pathName = n5properties.getPath( setupId, timepointId, level );
final DatasetAttributes attributes = n5properties.getDatasetAttributes( n5, pathName );
return new FinalDimensions( attributes.getDimensions() );
}
catch( final RuntimeException e )
Expand Down Expand Up @@ -330,17 +436,19 @@ private < T extends NativeType< T > > RandomAccessibleInterval< T > prepareCache
{
try
{
final String pathName = getPathName( setupId, timepointId, level );
final DatasetAttributes attributes = n5.getDatasetAttributes( pathName );
final String pathName = n5properties.getPath( setupId, timepointId, level );
final DatasetAttributes attributes = n5properties.getDatasetAttributes( n5, pathName );
final long[] dimensions = attributes.getDimensions();
final int[] cellDimensions = attributes.getBlockSize();
final CellGrid grid = new CellGrid( dimensions, cellDimensions );

final int priority = numMipmapLevels() - 1 - level;
final CacheHints cacheHints = new CacheHints( loadingStrategy, priority, false );

final SimpleCacheArrayLoader< ? > loader = createCacheArrayLoader( n5, pathName );
return cache.createImg( grid, timepointId, setupId, level, cacheHints, loader, type );
final SimpleCacheArrayLoader< ? > loader = createCacheArrayLoader( n5properties, n5, pathName );
final VolatileCachedCellImg<T, ?> img = cache.createImg( grid, timepointId, setupId, level, cacheHints, loader, type );

return n5properties.extractImg( img, setupId, timepointId );
}
catch ( final IOException | N5Exception e )
{
Expand Down Expand Up @@ -417,12 +525,12 @@ public A loadArray( final long[] gridPosition, final int[] cellDimensions ) thro
}
}

public static SimpleCacheArrayLoader< ? > createCacheArrayLoader( final N5Reader n5, final String pathName ) throws IOException
public static SimpleCacheArrayLoader< ? > createCacheArrayLoader( final N5Properties n5Properties, final N5Reader n5, final String pathName ) throws IOException
{
final DatasetAttributes attributes;
try
{
attributes = n5.getDatasetAttributes( pathName );
attributes = n5Properties.getDatasetAttributes( n5, pathName );
}
catch ( final N5Exception e )
{
Expand Down
Loading

0 comments on commit 714a04e

Please sign in to comment.