Skip to content

Commit

Permalink
do not rely on tarfile
Browse files Browse the repository at this point in the history
  • Loading branch information
magland committed Aug 3, 2024
1 parent 11174b5 commit dec2e6e
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 55 deletions.
121 changes: 121 additions & 0 deletions examples/benchmark1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import os
import h5py
import numpy as np
import time
import lindi
import gzip
import zarr
import numcodecs


def create_dataset(size):
return np.random.rand(size)


def benchmark_h5py(file_path, num_small_datasets, num_large_datasets, small_size, large_size, compression, mode):
start_time = time.time()

if mode == 'dat':
with open(file_path, 'wb') as f:
# Write small datasets
print('Writing small datasets')
for i in range(num_small_datasets):
data = create_dataset(small_size)
f.write(data.tobytes())

# Write large datasets
print('Writing large datasets')
for i in range(num_large_datasets):
data = create_dataset(large_size)
if compression == 'gzip':
data_zipped = gzip.compress(data.tobytes(), compresslevel=4)
f.write(data_zipped)
elif compression is None:
f.write(data.tobytes())
else:
raise ValueError(f"Unknown compressor: {compression}")
elif mode == 'zarr':
if os.path.exists(file_path):
import shutil
shutil.rmtree(file_path)
store = zarr.DirectoryStore(file_path)
root = zarr.group(store)

if compression == 'gzip':
compressor = numcodecs.GZip(level=4)
else:
compressor = None

# Write small datasets
print('Writing small datasets')
for i in range(num_small_datasets):
data = create_dataset(small_size)
root.create_dataset(f'small_dataset_{i}', data=data)

# Write large datasets
print('Writing large datasets')
for i in range(num_large_datasets):
data = create_dataset(large_size)
root.create_dataset(f'large_dataset_{i}', data=data, chunks=(1000,), compressor=compressor)
else:
if mode == 'h5':
f = h5py.File(file_path, 'w')
else:
f = lindi.LindiH5pyFile.from_lindi_file(file_path, mode='w')

# Write small datasets
print('Writing small datasets')
for i in range(num_small_datasets):
data = create_dataset(small_size)
ds = f.create_dataset(f'small_dataset_{i}', data=data)
ds.attrs['attr1'] = 1

# Write large datasets
print('Writing large datasets')
for i in range(num_large_datasets):
data = create_dataset(large_size)
ds = f.create_dataset(f'large_dataset_{i}', data=data, chunks=(1000,), compression=compression)
ds.attrs['attr1'] = 1

f.close()

end_time = time.time()
total_time = end_time - start_time

# Calculate total data size
total_size = (num_small_datasets * small_size + num_large_datasets * large_size) * 8 # 8 bytes per float64
total_size_gb = total_size / (1024 ** 3)

print("H5PY Benchmark Results:")
print(f"Total time: {total_time:.2f} seconds")
print(f"Total data size: {total_size_gb:.2f} GB")
print(f"Write speed: {total_size_gb / total_time:.2f} GB/s")

h5py_file_size = os.path.getsize(file_path) / (1024 ** 3)
print(f"File size: {h5py_file_size:.2f} GB")

return total_time, total_size_gb


if __name__ == "__main__":
file_path_h5 = "benchmark.h5"
file_path_lindi = "benchmark.lindi"
file_path_dat = "benchmark.dat"
file_path_zarr = "benchmark.zarr"
num_small_datasets = 0
num_large_datasets = 5
small_size = 1000
large_size = 10000000
compression = None # 'gzip' or None

print('Zarr Benchmark')
lindi_time, total_size = benchmark_h5py(file_path_zarr, num_small_datasets, num_large_datasets, small_size, large_size, compression=compression, mode='zarr')
print('')
print('Lindi Benchmark')
lindi_time, total_size = benchmark_h5py(file_path_lindi, num_small_datasets, num_large_datasets, small_size, large_size, compression=compression, mode='lindi')
print('')
print('H5PY Benchmark')
h5py_time, total_size = benchmark_h5py(file_path_h5, num_small_datasets, num_large_datasets, small_size, large_size, compression=compression, mode='h5')
print('')
print('DAT Benchmark')
lindi_time, total_size = benchmark_h5py(file_path_dat, num_small_datasets, num_large_datasets, small_size, large_size, compression=compression, mode='dat')
1 change: 1 addition & 0 deletions lindi/LindiH5pyFile/LindiH5pyFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ def flush(self):
rfs = self.to_reference_file_system()
if self._source_tar_file:
self._source_tar_file.write_rfs(rfs)
self._source_tar_file._update_index() # very important
else:
_write_rfs_to_file(rfs=rfs, output_file_name=self._source_url_or_path)

Expand Down
2 changes: 2 additions & 0 deletions lindi/LindiH5pyFile/writers/LindiH5pyGroupWriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ def create_dataset(
_zarr_compressor = numcodecs.GZip(level=level)
else:
raise Exception(f'Compression {compression} is not supported')
elif compression is None:
_zarr_compressor = None
else:
raise Exception(f'Unexpected type for compression: {type(compression)}')

Expand Down
16 changes: 8 additions & 8 deletions lindi/conversion/create_zarr_dataset_from_h5_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def create_zarr_dataset_from_h5_data(
name: str,
label: str,
h5_chunks: Union[Tuple, None],
zarr_compressor: Union[Codec, Literal['default']] = 'default'
zarr_compressor: Union[Codec, Literal['default'], None] = 'default'
):
"""Create a zarr dataset from an h5py dataset.
Expand All @@ -43,9 +43,9 @@ def create_zarr_dataset_from_h5_data(
The name of the h5py dataset for error messages.
h5_chunks : tuple
The chunk shape of the h5py dataset.
zarr_compressor : numcodecs.abc.Codec
zarr_compressor : numcodecs.abc.Codec, 'default', or None
The codec compressor to use when writing the dataset. If default, the
default compressor will be used.
default compressor will be used. When None, no compressor will be used.
"""
if h5_dtype is None:
raise Exception(f'No dtype in h5_to_zarr_dataset_prep for dataset {label}')
Expand All @@ -58,7 +58,7 @@ def create_zarr_dataset_from_h5_data(
if h5_data is None:
raise Exception(f'Data must be provided for scalar dataset {label}')

if zarr_compressor != 'default':
if zarr_compressor != 'default' and zarr_compressor is not None:
raise Exception('zarr_compressor is not supported for scalar datasets')

if _is_numeric_dtype(h5_dtype) or h5_dtype in [bool, np.bool_]:
Expand Down Expand Up @@ -131,7 +131,7 @@ def create_zarr_dataset_from_h5_data(
)
elif h5_dtype.kind == 'O':
# For type object, we are going to use the JSON codec
if zarr_compressor != 'default':
if zarr_compressor != 'default' and zarr_compressor is not None:
raise Exception('zarr_compressor is not supported for object datasets')
if h5_data is not None:
if isinstance(h5_data, h5py.Dataset):
Expand All @@ -149,7 +149,7 @@ def create_zarr_dataset_from_h5_data(
object_codec=object_codec
)
elif h5_dtype.kind == 'S': # byte string
if zarr_compressor != 'default':
if zarr_compressor != 'default' and zarr_compressor is not None:
raise Exception('zarr_compressor is not supported for byte string datasets')
if h5_data is None:
raise Exception(f'Data must be provided when converting dataset {label} with dtype {h5_dtype}')
Expand All @@ -161,11 +161,11 @@ def create_zarr_dataset_from_h5_data(
data=h5_data
)
elif h5_dtype.kind == 'U': # unicode string
if zarr_compressor != 'default':
if zarr_compressor != 'default' and zarr_compressor is not None:
raise Exception('zarr_compressor is not supported for unicode string datasets')
raise Exception(f'Array of unicode strings not supported: dataset {label} with dtype {h5_dtype} and shape {h5_shape}')
elif h5_dtype.kind == 'V' and h5_dtype.fields is not None: # compound dtype
if zarr_compressor != 'default':
if zarr_compressor != 'default' and zarr_compressor is not None:
raise Exception('zarr_compressor is not supported for compound datasets')
if h5_data is None:
raise Exception(f'Data must be provided when converting compound dataset {label}')
Expand Down
4 changes: 4 additions & 0 deletions lindi/tar/LindiTarStore.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import random
import numpy as np
from zarr.storage import Store as ZarrStore
from ..LindiH5pyFile.LindiReferenceFileSystemStore import LindiReferenceFileSystemStore
from .lindi_tar import LindiTarFile
Expand All @@ -19,7 +20,10 @@ def __setitem__(self, key: str, value: bytes):
inline = True
else:
# presumably it is a chunk of an array
if isinstance(value, np.ndarray):
value = value.tobytes()
if not isinstance(value, bytes):
print(f"key: {key}, value type: {type(value)}")
raise ValueError("Value must be bytes")
size = len(value)
inline = size < 1000 # this should be a configurable threshold
Expand Down
Loading

0 comments on commit dec2e6e

Please sign in to comment.