Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue6: support for loading only chunks needed for a selection #58

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyfive/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
"""

from .high_level import File
from .indexing import ZarrArrayStub, OrthogonalIndexer

__version__ = '0.4.0.dev'
14 changes: 14 additions & 0 deletions pyfive/btree.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,20 @@ def construct_data_from_chunks(

non_padded_region = tuple([slice(i) for i in data_shape])
return data[non_padded_region]

def get_one_chunk_buffer(self, addr, size, itemsize, filter_pipeline, filter_mask):
"""
Used when getting data chunk by chunk for reading partial data arrays
All the shaping and positioning is done in the calling function.
"""
self.fh.seek(addr)
chunk_buffer = self.fh.read(size)
if filter_pipeline is not None:
#chunk_buffer = self.fh.read(size)
#filter_mask = filter_mask
chunk_buffer = self._filter_chunk(
chunk_buffer, filter_mask, filter_pipeline, itemsize)
return chunk_buffer

@classmethod
def _filter_chunk(cls, chunk_buffer, filter_mask, filter_pipeline, itemsize):
Expand Down
250 changes: 181 additions & 69 deletions pyfive/dataobjects.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .btree import BTreeV2GroupNames, BTreeV2GroupOrders
from .btree import GZIP_DEFLATE_FILTER, SHUFFLE_FILTER, FLETCH32_FILTER
from .misc_low_level import Heap, SymbolTable, GlobalHeap, FractalHeap
from .indexing import OrthogonalIndexer, ZarrArrayStub

# these constants happen to have the same value...
UNLIMITED_SIZE = UNDEFINED_ADDRESS
Expand All @@ -28,7 +29,7 @@ class DataObjects(object):
HDF5 DataObjects.
"""

def __init__(self, fh, offset):
def __init__(self, fh, offset, order='C'):
""" initalize. """
fh.seek(offset)
version_hint = struct.unpack_from('<B', fh.read(1))[0]
Expand All @@ -53,6 +54,7 @@ def __init__(self, fh, offset):
self._chunks = None
self._chunk_dims = None
self._chunk_address = None
self.order = order

@staticmethod
def _parse_v1_objects(fh):
Expand Down Expand Up @@ -417,74 +419,6 @@ def filter_pipeline(self):
self._filter_pipeline = filters
return self._filter_pipeline

def get_data(self):
""" Return the data pointed to in the DataObject. """

# offset and size from data storage message
msg = self.find_msg_type(DATA_STORAGE_MSG_TYPE)[0]
msg_offset = msg['offset_to_message']
version, dims, layout_class, property_offset = (
self._get_data_message_properties(msg_offset))

if layout_class == 0: # compact storage
raise NotImplementedError("Compact storage")
elif layout_class == 1: # contiguous storage
return self._get_contiguous_data(property_offset)
if layout_class == 2: # chunked storage
return self._get_chunked_data(msg_offset)

def _get_data_message_properties(self, msg_offset):
""" Return the message properties of the DataObject. """
dims, layout_class, property_offset = None, None, None
version, arg1, arg2 = struct.unpack_from(
'<BBB', self.msg_data, msg_offset)
if (version == 1) or (version == 2):
dims = arg1
layout_class = arg2
property_offset = msg_offset
property_offset += struct.calcsize('<BBB')
# reserved fields: 1 byte, 1 int
property_offset += struct.calcsize('<BI')
# compact storage (layout class 0) not supported:
assert (layout_class == 1) or (layout_class == 2)
elif (version == 3) or (version == 4):
layout_class = arg1
property_offset = msg_offset
property_offset += struct.calcsize('<BB')
assert (version >= 1) and (version <= 4)
return version, dims, layout_class, property_offset

def _get_contiguous_data(self, property_offset):
data_offset, = struct.unpack_from('<Q', self.msg_data, property_offset)

if data_offset == UNDEFINED_ADDRESS:
# no storage is backing array, return all zeros
return np.zeros(self.shape, dtype=self.dtype)

if not isinstance(self.dtype, tuple):
# return a memory-map to the stored array with copy-on-write
return np.memmap(self.fh, dtype=self.dtype, mode='c',
offset=data_offset, shape=self.shape, order='C')
else:
dtype_class = self.dtype[0]
if dtype_class == 'REFERENCE':
size = self.dtype[1]
if size != 8:
raise NotImplementedError('Unsupported Reference type')
ref_addresses = np.memmap(
self.fh, dtype=('<u8'), mode='c', offset=data_offset,
shape=self.shape, order='C')
return np.array([Reference(addr) for addr in ref_addresses])
else:
raise NotImplementedError('datatype not implemented')

def _get_chunked_data(self, offset):
""" Return data which is chunked. """
self._get_chunk_params()
chunk_btree = BTreeV1RawDataChunks(
self.fh, self._chunk_address, self._chunk_dims)
return chunk_btree.construct_data_from_chunks(
self.chunks, self.shape, self.dtype, self.filter_pipeline)

def _get_chunk_params(self):
"""
Expand Down Expand Up @@ -659,7 +593,185 @@ def _decode_link_info_msg(data, offset):
def is_dataset(self):
""" True when DataObjects points to a dataset, False for a group. """
return len(self.find_msg_type(DATASPACE_MSG_TYPE)) > 0


class DatasetDataObject(DataObjects):
"""
Subclass of DataObjects associated with one Dataset, and
which handles actual data access.
"""
def __init__(self,*args,**kwargs):
"""
Initialise via super class
"""
super().__init__(*args,**kwargs)

# Need our own copy for now to utilise the zarr indexer.
self._zchunk_index={}
self.order='C'

# offset and size from data storage message
msg = self.find_msg_type(DATA_STORAGE_MSG_TYPE)[0]
self.msg_offset = msg['offset_to_message']
version, dims, self.layout_class, self.property_offset = (
self._get_data_message_properties(self.msg_offset))

def get_data(self, args=None):
"""
Return the data pointed to in the DataObject.
"""

if self.layout_class == 0: # compact storage
raise NotImplementedError("Compact storage")
elif self.layout_class == 1: # contiguous storage
if args is None:
return self._get_contiguous_data(self.property_offset)
else:
return self._get_contiguous_data(self.property_offset)[args]
if self.layout_class == 2: # chunked storage
# If reading all chunks, use the (hopefully faster) "do it one go" method.
# If the dtype is a tuple, we don't really know how to deal with it chunk by chunk in this version
if args is None:
return self._get_chunked_data(self.msg_offset)
elif isinstance(self.dtype, tuple):
return self._get_chunked_data(self.msg_offset)[args]
else:
return self._get_selection_via_chunks(args)

def _get_data_message_properties(self, msg_offset):
""" Return the message properties of the DataObject. """
dims, layout_class, property_offset = None, None, None
version, arg1, arg2 = struct.unpack_from(
'<BBB', self.msg_data, msg_offset)
if (version == 1) or (version == 2):
dims = arg1
layout_class = arg2
property_offset = msg_offset
property_offset += struct.calcsize('<BBB')
# reserved fields: 1 byte, 1 int
property_offset += struct.calcsize('<BI')
# compact storage (layout class 0) not supported:
assert (layout_class == 1) or (layout_class == 2)
elif (version == 3) or (version == 4):
layout_class = arg1
property_offset = msg_offset
property_offset += struct.calcsize('<BB')
assert (version >= 1) and (version <= 4)
return version, dims, layout_class, property_offset

def _get_contiguous_data(self, property_offset):
data_offset, = struct.unpack_from('<Q', self.msg_data, property_offset)

if data_offset == UNDEFINED_ADDRESS:
# no storage is backing array, return all zeros
return np.zeros(self.shape, dtype=self.dtype)

if not isinstance(self.dtype, tuple):
# return a memory-map to the stored array with copy-on-write
return np.memmap(self.fh, dtype=self.dtype, mode='c',
offset=data_offset, shape=self.shape, order=self.order)
else:
dtype_class = self.dtype[0]
if dtype_class == 'REFERENCE':
size = self.dtype[1]
if size != 8:
raise NotImplementedError('Unsupported Reference type')
ref_addresses = np.memmap(
self.fh, dtype=('<u8'), mode='c', offset=data_offset,
shape=self.shape, order=self.order)
return np.array([Reference(addr) for addr in ref_addresses])
else:
raise NotImplementedError('datatype not implemented')

def _get_chunked_data(self, offset):
""" Return data which is chunked. """
self._get_chunk_params()
chunk_btree = BTreeV1RawDataChunks(
self.fh, self._chunk_address, self._chunk_dims)
return chunk_btree.construct_data_from_chunks(
self.chunks, self.shape, self.dtype, self.filter_pipeline)

def get_chunk_details(self, chunk_coords):
"""
Returns the chunk details associated with chunk coords
returned by the Zarr orthogonal indexer. The special case
is that if the data is contiguous, we still want to return
the offset and size, as the point of this entry point is
to provide third party applications an address to the data.
"""
if self.layout_class == 0: # compact storage
raise NotImplementedError("Compact storage")
elif self.layout_class == 1: # contiguous storage
# This option never used by pyfive itself as we use the memory map for
# access to contiguous data, but third parties may need it.
# Ignore coordinates, just give the location and size of entire array
data_offset, = struct.unpack_from('<Q', self.msg_data, self.property_offset)
# No way there can be filtering of an unchunked dataset, so no filter mask?
return data_offset, np.prod(self.shape)*np.dtype(self.dtype).itemsize, None
else:
if self._zchunk_index == {}:
self._get_chunk_addresses()

return self._zchunk_index[chunk_coords]

def _get_chunk_addresses(self):
"""
Get the offset addresses associated with all the chunks
known to the b-tree of this object, and load them into
an index suitable for use with the zarr indexer.
"""
if self._zchunk_index == {}:

self._get_chunk_params()

self.chunk_btree = BTreeV1RawDataChunks(
self.fh, self._chunk_address, self._chunk_dims)

count = np.prod(self.shape)
itemsize = np.dtype(self.dtype).itemsize

# The zarr orthogonal indexer returns the position in chunk
# space, whereas pyfive wants the position in array space.
# Here we index the pyfive chunk_index in zarr index space.

ichunks = [1/c for c in self.chunks]

for node in self.chunk_btree.all_nodes[0]:
for node_key, addr in zip(node['keys'], node['addresses']):
size = node_key['chunk_size']
if self.filter_pipeline:
# I am not sure this varies per chunk, but in case it does
filter_mask = node_key['filter_mask']
else:
filter_mask=None
start = node_key['chunk_offset'][:-1]
key = tuple([int(i*d) for i,d in zip(list(start),ichunks)])
self._zchunk_index[key] = (addr,size,filter_mask)

def _get_selection_via_chunks(self, args):
"""
Use the zarr orthogonal indexer to extract data for a specfic selection within
the dataset array and in doing so, only load the relevant chunks.
"""

array = ZarrArrayStub(self.shape, self.chunks)
indexer = OrthogonalIndexer(args, array)
# FIXME: Need to understand what drop_axes was up to and whether or not
# it is relevant to this or not (I didn't understand it in the zarr implementation).

itemsize = np.dtype(self.dtype).itemsize
out_shape = indexer.shape
out = np.empty(out_shape, dtype=self.dtype, order=self.order)

for chunk_coords, chunk_selection, out_selection in indexer:
addr, chunk_buffer_size, filter_mask = self.get_chunk_details(chunk_coords)
chunk_buffer = self.chunk_btree.get_one_chunk_buffer(
addr, chunk_buffer_size, itemsize, self.filter_pipeline, filter_mask)
chunk_data = np.frombuffer(chunk_buffer, dtype=self.dtype)
out[out_selection] = chunk_data.reshape(self.chunks, order=self.order)[chunk_selection]

return out


def determine_data_shape(buf, offset):
""" Return the shape of the dataset pointed to in a Dataspace message. """
Expand Down
7 changes: 4 additions & 3 deletions pyfive/high_level.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import numpy as np

from .core import Reference
from .dataobjects import DataObjects
from .dataobjects import DataObjects, DatasetDataObject
from .misc_low_level import SuperBlock


Expand Down Expand Up @@ -89,7 +89,7 @@ def __getitem__(self, y):
if dataobjs.is_dataset:
if additional_obj != '.':
raise KeyError('%s is a dataset, not a group' % (obj_name))
return Dataset(obj_name, dataobjs, self)
return Dataset(obj_name, DatasetDataObject(self.file._fh, link_target), self)
return Group(obj_name, dataobjs, self)[additional_obj]

def __iter__(self):
Expand Down Expand Up @@ -276,7 +276,7 @@ def __repr__(self):
return '<HDF5 dataset "%s": shape %s, type "%s">' % info

def __getitem__(self, args):
data = self._dataobjects.get_data()[args]
data = self._dataobjects.get_data(args)
if self._astype is None:
return data
return data.astype(self._astype)
Expand Down Expand Up @@ -323,6 +323,7 @@ def ndim(self):
@property
def dtype(self):
""" dtype attribute. """
# In the HDF5 implementation this is a numpy dtype
return self._dataobjects.dtype

@property
Expand Down
Loading
Loading