Skip to content

Commit

Permalink
Improve loading performance (#43)
Browse files Browse the repository at this point in the history
* direct copying

* improve specialization

* add assert
  • Loading branch information
nhz2 authored Nov 28, 2023
1 parent 04dd43e commit 074728b
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 219 deletions.
43 changes: 21 additions & 22 deletions src/compression.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,32 @@ const BLOSC_MAX_BUFFERSIZE = typemax(Cint) - BLOSC_MAX_OVERHEAD
import TranscodingStreams, CodecZlib

"""
Return the uncompressed data.
May return src if no compression was used, or buffer, if compression was used.
`src` is the compressed data.
`buffer` is a buffer used to avoid allocations, it may be resized and returned.
Uncompressed the data.
"""
function decompress!(buffer::Vector{UInt8}, src::Vector{UInt8}, metadata::ParsedMetaData)::Vector{UInt8}
expected_output_size = prod(metadata.chunks)*metadata.dtype.zarr_size
@argcheck expected_output_size > 0
if isnothing(metadata.compressor)
return src
end
id = metadata.compressor.id
if id == "blosc"
@noinline function unsafe_decompress!(p::Ptr{UInt8}, n::Int, src::Vector{UInt8}, compressor)::Nothing
@argcheck n > 0
if !isnothing(compressor) && compressor.id == "blosc"
numinternalthreads = 1
buffer = Vector{UInt8}(undef, expected_output_size)
sz = ccall((:blosc_decompress_ctx,Blosc_jll.libblosc), Cint,
(Ptr{Cvoid},Ptr{Cvoid},Csize_t,Cint), src, buffer, expected_output_size, numinternalthreads)
sz == expected_output_size || error("Blosc decompress error, compressed data is corrupted")
buffer
elseif id == "zlib"
TranscodingStreams.transcode(CodecZlib.ZlibDecompressor, src)
elseif id == "gzip"
TranscodingStreams.transcode(CodecZlib.GzipDecompressor, src)
(Ptr{Cvoid},Ptr{Cvoid},Csize_t,Cint), src, p, n, numinternalthreads)
sz == n || error("Blosc decompress error, compressed data is corrupted")
return
end
r = if isnothing(compressor)
src
else
error("$(id) compressor not supported yet")
id = compressor.id
if id == "zlib"
TranscodingStreams.transcode(CodecZlib.ZlibDecompressor, src)
elseif id == "gzip"
TranscodingStreams.transcode(CodecZlib.GzipDecompressor, src)
else
error("$(id) compressor not supported yet")
end
end
@argcheck length(r) == n
GC.@preserve r Base.unsafe_copyto!(p, Base.unsafe_convert(Ptr{UInt8}, r), n)
nothing
end

"""
Expand Down
137 changes: 75 additions & 62 deletions src/loading.jl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ function load_dir(reader::AbstractReader)::ZGroup
output = ZGroup()
keynames = key_names(reader)
splitkeys = map(x->split(x,'/';keepempty=false), keynames)
keyname_dict = Dict(zip(keynames,eachindex(keynames)))
keyname_dict::Dict{String, Int} = Dict{String, Int}(zip(keynames,eachindex(keynames)))
try_add_attrs!(output, reader, keyname_dict, "")
for splitkey in sort(splitkeys)
if length(splitkey) < 2
Expand All @@ -56,73 +56,86 @@ function load_dir(reader::AbstractReader)::ZGroup
arrayname = join(splitkey[begin:end-1],'/')
arrayidx = keyname_dict[arrayname*"/.zarray"]
metadata = parse_zarr_metadata(JSON3.read(read_key_idx(reader, arrayidx)))
fill_value = reinterpret(metadata.dtype.julia_type, metadata.fill_value)[1]
shape, chunks = if metadata.is_column_major
metadata.shape, metadata.chunks
else
reverse(metadata.shape), reverse(metadata.chunks)
end
array = fill(fill_value, shape...)
zarr_size = metadata.dtype.zarr_size
julia_size = metadata.dtype.julia_size
fill_value = metadata.fill_value
zarray = load_array(
fill_value,
Tuple(metadata.shape),
Tuple(metadata.chunks),
arrayname,
metadata.dimension_separator,
keyname_dict,
reader,
metadata.dtype.in_native_order,
metadata.is_column_major,
metadata.compressor,
)

# If there is no actual data don't load chunks
if !(any(==(0), shape) || julia_size == 0 || zarr_size == 0)
# load chunks
for chunkidx in CartesianIndices(Tuple(cld.(shape,chunks)))
chunktuple = Tuple(chunkidx) .- 1
chunknametuple = if metadata.is_column_major
chunktuple
else
#shape and chunks have been pre reversed so reverse chunkidx as well.
reverse(chunktuple)
end
# empty chunk has name "0" this is the case for zero dim arrays
chunkname = arrayname*"/"*(isempty(chunknametuple) ? "0" : join(chunknametuple, metadata.dimension_separator))
chunknameidx = get(Returns(0), keyname_dict, chunkname)
if chunknameidx > 0
rawchunkdata = read_key_idx(reader, chunknameidx)
decompressed_chunkdata = decompress!(Vector{UInt8}(), rawchunkdata, metadata)
chunkstart = chunktuple .* chunks .+ 1
chunkstop = min.(chunkstart .+ chunks .- 1, shape)
real_chunksize = chunkstop .- chunkstart .+ 1
if julia_size == 1
@assert zarr_size == 1
shaped_chunkdata = reshape(decompressed_chunkdata, chunks...)
shaped_array = reinterpret(UInt8, array)
array_view = view(shaped_array, (range.(chunkstart, chunkstop))...)
chunk_view = view(shaped_chunkdata, (range.(1, real_chunksize))...)
array_view .= chunk_view
else
shaped_chunkdata = reshape(decompressed_chunkdata, zarr_size, chunks...)
shaped_array = reinterpret(reshape, UInt8, array)
# now create overlapping views
array_view = view(shaped_array, :, (range.(chunkstart, chunkstop))...)
chunk_view = view(shaped_chunkdata, :, (range.(1, real_chunksize))...)
# TODO check if the data can just be directly copied.
for (zarr_byte, julia_byte) in enumerate(metadata.dtype.byteorder)
selectdim(array_view, 1, julia_byte) .= selectdim(chunk_view, 1, zarr_byte)
end
end
end
end
end

zarray = if metadata.is_column_major || ndims(array) 1
ZArray(array;
chunks = Tuple(chunks),
compressor = metadata.compressor,
)
else
ZArray(permutedims(array,reverse(1:length(shape)));
chunks = Tuple(reverse(chunks)),
compressor = metadata.compressor,
)
end
output[arrayname] = zarray

try_add_attrs!(zarray, reader, keyname_dict, arrayname*"/")
end
end
output
end


function load_array(
fill_value::T,
shape::NTuple{N, Int},
chunks::NTuple{N, Int},
arrayname::String,
dimension_separator::Char,
keyname_dict::Dict{String,Int},
reader,
in_native_order::Bool,
is_column_major::Bool,
compressor,
)::ZArray{T, N} where {T, N}
array = fill(fill_value, shape...)
# If there is no actual data don't load chunks
if !(any(==(0), shape) || sizeof(T) == 0)
# load chunks
for chunkidx in CartesianIndices(Tuple(cld.(shape,chunks)))
chunktuple = Tuple(chunkidx) .- 1
# empty chunk has name "0" this is the case for zero dim arrays
chunkname = arrayname*"/"*(isempty(chunktuple) ? "0" : join(chunktuple, dimension_separator))
chunknameidx = get(Returns(0), keyname_dict, chunkname)
if chunknameidx > 0
rawchunkdata = read_key_idx(reader, chunknameidx)
decompressed_chunkdata = Vector{T}(undef, prod(chunks))
GC.@preserve decompressed_chunkdata unsafe_decompress!(
Base.unsafe_convert(Ptr{UInt8}, decompressed_chunkdata),
sizeof(decompressed_chunkdata),
rawchunkdata,
compressor,
)
if !in_native_order
for i in eachindex(decompressed_chunkdata)
decompressed_chunkdata[i] = htol(ntoh(decompressed_chunkdata[i]))
end
end
chunkstart = chunktuple .* chunks .+ 1
chunkstop = min.(chunkstart .+ chunks .- 1, shape)
real_chunksize = chunkstop .- chunkstart .+ 1

shaped_chunkdata = if is_column_major || N 1
reshape(decompressed_chunkdata, chunks...)
else
permutedims(reshape(decompressed_chunkdata, reverse(chunks)...), ((N:-1:1)...,))
end
copyto!(
array,
CartesianIndices(((range.(chunkstart, chunkstop))...,)),
shaped_chunkdata,
CartesianIndices(((range.(1, real_chunksize))...,))
)
end
end
end

ZArray(array;
chunks,
compressor,
)
end
10 changes: 4 additions & 6 deletions src/saving.jl
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,16 @@ function _save_zarray(writer::AbstractWriter, key_prefix::String, z::ZArray)
dtype_str::String = sprint(write_type, eltype(data))
dtype::ParsedType = parse_zarr_type(JSON3.read(dtype_str))
@assert dtype.julia_type == eltype(data)
@assert dtype.in_native_order
shape = size(data)
zarr_size = dtype.zarr_size
zarr_size = dtype.type_size
norm_compressor = normalize_compressor(z.compressor)
if zarr_size != 0 && !any(iszero, shape)
chunks = Tuple(z.chunks)
# store chunks
shaped_chunkdata = zeros(UInt8, zarr_size, reverse(chunks)...)
permuted_shaped_chunkdata = PermutedDimsArray(shaped_chunkdata, (1, ndims(z)+1:-1:2...))
shaped_array = if dtype.julia_size == 1
shaped_array = if zarr_size == 1
reshape(reinterpret(reshape, UInt8, data), 1, shape...)
else
reinterpret(reshape, UInt8, data)
Expand All @@ -104,10 +105,7 @@ function _save_zarray(writer::AbstractWriter, key_prefix::String, z::ZArray)
# now create overlapping views
array_view = view(shaped_array, :, (range.(chunkstart, chunkstop))...)
chunk_view = view(permuted_shaped_chunkdata, :, (range.(1, real_chunksize))...)
# TODO check if the data can just be directly copied.
for (zarr_byte, julia_byte) in enumerate(dtype.byteorder)
selectdim(chunk_view, 1, zarr_byte) .= selectdim(array_view, 1, julia_byte)
end
copy!(chunk_view, array_view)
compressed_chunkdata = compress(norm_compressor, reshape(shaped_chunkdata,:), zarr_size)
# empty chunk has name "0" this is the case for zero dim arrays
chunkname = key_prefix*(isempty(chunktuple) ? "0" : join(chunktuple, '.'))
Expand Down
Loading

0 comments on commit 074728b

Please sign in to comment.