Skip to content

Commit

Permalink
Merge pull request #622 from jamesmudd/fixed-array-paging
Browse files Browse the repository at this point in the history
Add fixed array paging support
  • Loading branch information
jamesmudd authored Nov 25, 2024
2 parents a064b24 + 5668723 commit d733f4d
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 34 deletions.
2 changes: 1 addition & 1 deletion jhdf/src/main/java/io/jhdf/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public static long readBytesAsUnsignedLong(ByteBuffer buffer, int length) {
case 8:
long value = buffer.getLong();
if (value < 0 && value != Constants.UNDEFINED_ADDRESS) {
throw new ArithmeticException("Could not convert to unsigned");
throw new ArithmeticException("Could not convert to unsigned value: " + value);
}
return value;
default:
Expand Down
19 changes: 19 additions & 0 deletions jhdf/src/main/java/io/jhdf/checksum/ChecksumUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,26 @@ public static void validateChecksum(ByteBuffer buffer) {
if (calculatedChecksum != storedChecksum) {
throw new HdfChecksumMismatchException(storedChecksum, calculatedChecksum);
}
}

/**
* Checks the next 4 bytes of the buffer are the the Jenkins Lookup 3 Checksum of the bytes between the mark and the
* current position. The buffer position and mark are moved to after the checksum.
*
* @param buffer the buffer to check
* @throws HdfChecksumMismatchException if the checksum is incorrect.
*/
public static void validateChecksumFromMark(ByteBuffer buffer) {
int position = buffer.position();
int end = buffer.reset().position(); // Move the buffer to the mark and get the new position
byte[] bytes = new byte[position - end];
buffer.get(bytes);
int calculatedChecksum = checksum(bytes);
int storedChecksum = buffer.getInt();
if (calculatedChecksum != storedChecksum) {
throw new HdfChecksumMismatchException(storedChecksum, calculatedChecksum);
}
buffer.mark();
}

public static int checksum(ByteBuffer buffer) {
Expand Down
152 changes: 120 additions & 32 deletions jhdf/src/main/java/io/jhdf/dataset/chunked/indexing/FixedArrayIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
import io.jhdf.dataset.chunked.DatasetInfo;
import io.jhdf.exceptions.HdfException;
import io.jhdf.storage.HdfBackingStorage;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
Expand All @@ -29,6 +33,8 @@ public class FixedArrayIndex implements ChunkIndex {
private static final byte[] FIXED_ARRAY_HEADER_SIGNATURE = "FAHD".getBytes(StandardCharsets.US_ASCII);
private static final byte[] FIXED_ARRAY_DATA_BLOCK_SIGNATURE = "FADB".getBytes(StandardCharsets.US_ASCII);

private static final Logger logger = LoggerFactory.getLogger(FixedArrayIndex.class);

private final long address;
private final int unfilteredChunkSize;

Expand All @@ -40,8 +46,10 @@ public class FixedArrayIndex implements ChunkIndex {
private final int pageBits;
private final int maxNumberOfEntries;
private final long dataBlockAddress;
private final int pages;
private final int pageSize;

private final List<Chunk> chunks;
private final FixedArrayDataBlockInitializer dataBlockInitializer;

public FixedArrayIndex(HdfBackingStorage hdfBackingStorage, long address, DatasetInfo datasetInfo) {
this.address = address;
Expand All @@ -51,6 +59,7 @@ public FixedArrayIndex(HdfBackingStorage hdfBackingStorage, long address, Datase

final int headerSize = 12 + hdfBackingStorage.getSizeOfOffsets() + hdfBackingStorage.getSizeOfLengths();
final ByteBuffer bb = hdfBackingStorage.readBufferFromAddress(address, headerSize);
bb.mark();

byte[] formatSignatureBytes = new byte[4];
bb.get(formatSignatureBytes, 0, formatSignatureBytes.length);
Expand All @@ -71,25 +80,47 @@ public FixedArrayIndex(HdfBackingStorage hdfBackingStorage, long address, Datase
pageBits = bb.get();

maxNumberOfEntries = Utils.readBytesAsUnsignedInt(bb, hdfBackingStorage.getSizeOfLengths());
dataBlockAddress = Utils.readBytesAsUnsignedLong(bb, hdfBackingStorage.getSizeOfOffsets());
pageSize = 1 << pageBits;
pages = (maxNumberOfEntries + pageSize -1) / pageSize;

chunks = new ArrayList<>(maxNumberOfEntries);
dataBlockAddress = Utils.readBytesAsUnsignedLong(bb, hdfBackingStorage.getSizeOfOffsets());

// Checksum
bb.rewind();
ChecksumUtils.validateChecksum(bb);
ChecksumUtils.validateChecksumFromMark(bb);

// Building the object fills the chunks. Probably shoudld be changed
new FixedArrayDataBlock(this, hdfBackingStorage, dataBlockAddress);
dataBlockInitializer = new FixedArrayDataBlockInitializer(hdfBackingStorage);
logger.info("Read fixed array index header. pages=[{}], maxEntries=[{}]", pages, maxNumberOfEntries);
}

private static class FixedArrayDataBlock {
private class FixedArrayDataBlockInitializer extends LazyInitializer<FixedArrayDataBlock> {

private final HdfBackingStorage hdfBackingStorage;

private FixedArrayDataBlock(FixedArrayIndex fixedArrayIndex, HdfBackingStorage hdfBackingStorage, long address) {
public FixedArrayDataBlockInitializer(HdfBackingStorage hdfBackingStorage) {
this.hdfBackingStorage = hdfBackingStorage;
}

@Override
protected FixedArrayDataBlock initialize() {
logger.info("Initializing data block");
return new FixedArrayDataBlock(hdfBackingStorage, FixedArrayIndex.this.dataBlockAddress);
}

// TODO header size ignoring paging
final int headerSize = 6 + hdfBackingStorage.getSizeOfOffsets() + fixedArrayIndex.entrySize * fixedArrayIndex.maxNumberOfEntries + 4;
}

private class FixedArrayDataBlock {

private final List<Chunk> chunks = new ArrayList<>(maxNumberOfEntries);

private FixedArrayDataBlock(HdfBackingStorage hdfBackingStorage, long address) {

int pageBitmapBytes = (pages + 7) / 8;
int headerSize = 6 + hdfBackingStorage.getSizeOfOffsets() + FixedArrayIndex.this.entrySize * FixedArrayIndex.this.maxNumberOfEntries + 4;
if(pages > 1) {
headerSize += pageBitmapBytes + (4 * pages);
}
final ByteBuffer bb = hdfBackingStorage.readBufferFromAddress(address, headerSize);
bb.mark();

byte[] formatSignatureBytes = new byte[4];
bb.get(formatSignatureBytes, 0, formatSignatureBytes.length);
Expand All @@ -105,44 +136,101 @@ private FixedArrayDataBlock(FixedArrayIndex fixedArrayIndex, HdfBackingStorage h
throw new HdfException("Unsupported fixed array data block version detected. Version: " + version);
}

final int clientId = bb.get();
if (clientId != fixedArrayIndex.clientId) {
final int dataBlockclientId = bb.get();
if (dataBlockclientId != FixedArrayIndex.this.clientId) {
throw new HdfException("Fixed array client ID mismatch. Possible file corruption detected");
}

final long headerAddress = Utils.readBytesAsUnsignedLong(bb, hdfBackingStorage.getSizeOfOffsets());
if (headerAddress != fixedArrayIndex.address) {
if (headerAddress != FixedArrayIndex.this.address) {
throw new HdfException("Fixed array data block header address missmatch");
}

// TODO ignoring paging here might need to revisit
if(pages > 1) {
readPaged(hdfBackingStorage, pageBitmapBytes, bb, dataBlockclientId);
} else {
// Unpaged
logger.info("Reading unpaged");
if (dataBlockclientId == 0) { // Not filtered
for (int i = 0; i < FixedArrayIndex.this.maxNumberOfEntries; i++) {
readUnfiltered(hdfBackingStorage.getSizeOfOffsets(), bb, i);
}
} else if (dataBlockclientId == 1) { // Filtered
for (int i = 0; i < FixedArrayIndex.this.maxNumberOfEntries; i++) {
readFiltered(hdfBackingStorage, bb, i);
}
} else {
throw new HdfException("Unrecognized client ID = " + dataBlockclientId);
}

if (clientId == 0) { // Not filtered
for (int i = 0; i < fixedArrayIndex.maxNumberOfEntries; i++) {
final long chunkAddress = Utils.readBytesAsUnsignedLong(bb, hdfBackingStorage.getSizeOfOffsets());
final int[] chunkOffset = Utils.chunkIndexToChunkOffset(i, fixedArrayIndex.chunkDimensions, fixedArrayIndex.datasetDimensions);
fixedArrayIndex.chunks.add(new ChunkImpl(chunkAddress, fixedArrayIndex.unfilteredChunkSize, chunkOffset));
ChecksumUtils.validateChecksumFromMark(bb);
}
}

private void readPaged(HdfBackingStorage hdfBackingStorage, int pageBitmapBytes, ByteBuffer bb, int dataBlockclientId) {
logger.info("Reading paged");
byte[] pageBitmap = new byte[pageBitmapBytes];
bb.get(pageBitmap);

ChecksumUtils.validateChecksumFromMark(bb);

int chunkIndex = 0;
for(int page = 0; page < pages; page++) {
final int currentPageSize = getCurrentPageSize(page);

if (dataBlockclientId == 0) { // Not filtered
for (int i = 0; i < currentPageSize; i++) {
readUnfiltered(hdfBackingStorage.getSizeOfOffsets(), bb, chunkIndex++);
}
} else if (dataBlockclientId == 1) { // Filtered
for (int i = 0; i < currentPageSize; i++) {
readFiltered(hdfBackingStorage, bb, chunkIndex++);
}
} else {
throw new HdfException("Unrecognized client ID = " + dataBlockclientId);
}
} else if (clientId == 1) { // Filtered
for (int i = 0; i < fixedArrayIndex.maxNumberOfEntries; i++) {
final long chunkAddress = Utils.readBytesAsUnsignedLong(bb, hdfBackingStorage.getSizeOfOffsets());
final int chunkSizeInBytes = Utils.readBytesAsUnsignedInt(bb, fixedArrayIndex.entrySize - hdfBackingStorage.getSizeOfOffsets() - 4);
final BitSet filterMask = BitSet.valueOf(new byte[]{bb.get(), bb.get(), bb.get(), bb.get()});
final int[] chunkOffset = Utils.chunkIndexToChunkOffset(i, fixedArrayIndex.chunkDimensions, fixedArrayIndex.datasetDimensions);

fixedArrayIndex.chunks.add(new ChunkImpl(chunkAddress, chunkSizeInBytes, chunkOffset, filterMask));
ChecksumUtils.validateChecksumFromMark(bb);
}
}

private int getCurrentPageSize(int page) {
final int currentPageSize;
if(page == pages -1) {
// last page so maybe not a full page
int lastPageSize = FixedArrayIndex.this.maxNumberOfEntries % FixedArrayIndex.this.pageSize;
if(lastPageSize == 0) {
currentPageSize = FixedArrayIndex.this.pageSize;
} else {
currentPageSize = lastPageSize;
}
} else {
throw new HdfException("Unrecognized client ID = " + clientId);
currentPageSize = FixedArrayIndex.this.pageSize;
}
return currentPageSize;
}

private void readFiltered(HdfBackingStorage hdfBackingStorage, ByteBuffer bb, int i) {
final long chunkAddress = Utils.readBytesAsUnsignedLong(bb, hdfBackingStorage.getSizeOfOffsets());
final int chunkSizeInBytes = Utils.readBytesAsUnsignedInt(bb, FixedArrayIndex.this.entrySize - hdfBackingStorage.getSizeOfOffsets() - 4);
final BitSet filterMask = BitSet.valueOf(new byte[]{bb.get(), bb.get(), bb.get(), bb.get()});
final int[] chunkOffset = Utils.chunkIndexToChunkOffset(i, FixedArrayIndex.this.chunkDimensions, FixedArrayIndex.this.datasetDimensions);

bb.rewind();
ChecksumUtils.validateChecksum(bb);
chunks.add(new ChunkImpl(chunkAddress, chunkSizeInBytes, chunkOffset, filterMask));
}

private void readUnfiltered(int sizeOfOffsets, ByteBuffer bb, int chunkIndex) {
final long chunkAddress = Utils.readBytesAsUnsignedLong(bb, sizeOfOffsets);
final int[] chunkOffset = Utils.chunkIndexToChunkOffset(chunkIndex, FixedArrayIndex.this.chunkDimensions, FixedArrayIndex.this.datasetDimensions);
chunks.add(new ChunkImpl(chunkAddress, FixedArrayIndex.this.unfilteredChunkSize, chunkOffset));
}
}

@Override
public Collection<Chunk> getAllChunks() {
return chunks;
try {
return this.dataBlockInitializer.get().chunks;
} catch (ConcurrentException e) {
throw new HdfException("Error initializing data block", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* This file is part of jHDF. A pure Java library for accessing HDF5 files.
*
* https://jhdf.io
*
* Copyright (c) 2024 James Mudd
*
* MIT License see 'LICENSE' file
*/
package io.jhdf.dataset.chunked.indexing;

import io.jhdf.HdfFile;
import io.jhdf.api.Dataset;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import static io.jhdf.TestUtils.loadTestHdfFile;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

class FixedArrayIndexTest {

private static final String HDF5_TEST_FILE_NAME = "fixed_array_paged_datasets.hdf5";

private static HdfFile hdfFile;

@BeforeAll
static void setup() throws Exception {
hdfFile = loadTestHdfFile(HDF5_TEST_FILE_NAME);
}

@AfterAll
static void tearDown() {
hdfFile.close();
}

@Test
void testDataReadCorrectly() {
// Unfiltered
Dataset int8Unpaged = hdfFile.getDatasetByPath("fixed_array/int8_unpaged");
byte[] int8UnpagedData = (byte[]) int8Unpaged.getDataFlat();
assertThat(int8UnpagedData).isNotEqualTo(expectedData(Math.toIntExact(int8Unpaged.getSize())));

Dataset int8TwoPage = hdfFile.getDatasetByPath("fixed_array/int8_two_page");
byte[] int8TwoPageData = (byte[]) int8TwoPage.getDataFlat();
assertThat(int8TwoPageData).isNotEqualTo(expectedData(Math.toIntExact(int8TwoPage.getSize())));

Dataset int8FivePage = hdfFile.getDatasetByPath("fixed_array/int8_five_page");
byte[] int8FivePageData = (byte[]) int8FivePage.getDataFlat();
assertThat(int8FivePageData).isNotEqualTo(expectedData(Math.toIntExact(int8FivePage.getSize())));

// Filtered
Dataset int8UnpagedFiltered = hdfFile.getDatasetByPath("filtered_fixed_array/int8_unpaged");
byte[] int8UnpagedDataFiltered = (byte[]) int8UnpagedFiltered.getDataFlat();
assertThat(int8UnpagedDataFiltered).isNotEqualTo(expectedData(Math.toIntExact(int8UnpagedFiltered.getSize())));

Dataset int8TwoPageFiltered = hdfFile.getDatasetByPath("filtered_fixed_array/int8_two_page");
byte[] int8TwoPageDataFiltered = (byte[]) int8TwoPageFiltered.getDataFlat();
assertThat(int8TwoPageDataFiltered).isNotEqualTo(expectedData(Math.toIntExact(int8TwoPageFiltered.getSize())));

Dataset int8FivePageFiltered = hdfFile.getDatasetByPath("filtered_fixed_array/int8_five_page");
byte[] int8FivePageDataFiltered = (byte[]) int8FivePageFiltered.getDataFlat();
assertThat(int8FivePageDataFiltered).isNotEqualTo(expectedData(Math.toIntExact(int8FivePageFiltered.getSize())));
}

private byte[] expectedData(int length) {
byte[] bytes = new byte[length];
for (int i = 0; i < length; i++) {
bytes[i] = (byte) i;
}
return bytes;
}
}
2 changes: 1 addition & 1 deletion jhdf/src/test/java/io/jhdf/writing/StringWritingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ void writeStrings() throws Exception {
assertThat(datasets).hasSize(4);

String[] proseReadBackArray = (String[]) hdfFile.getDatasetByPath("prose").getData();
String proseReadback = StringUtils.joinWith(" ", proseReadBackArray);
String proseReadback = StringUtils.joinWith(" ", (Object[]) proseReadBackArray);
assertThat(proseReadback).isEqualTo(prose);

// Just check thw whole file is readable
Expand Down
Binary file not shown.
42 changes: 42 additions & 0 deletions jhdf/src/test/resources/scripts/fixed_array_paged.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# -------------------------------------------------------------------------------
# This file is part of jHDF. A pure Java library for accessing HDF5 files.
#
# https://jhdf.io
#
# Copyright (c) 2024 James Mudd
#
# MIT License see 'LICENSE' file
# -------------------------------------------------------------------------------
import h5py

import numpy as np


def write_chunked_datasets(f):
# Less than 1025 element should be unpaged
data = np.arange(1000).reshape(10, 100)
# 1024 elements per page
two_page_data = np.arange(2048).reshape(128, 16)
five_page_data = np.arange(5000).reshape(200, 25)

# Fixed Array Index - Fixed maximum dimension sizes. Index type 3
fixed_array_group = f.create_group("fixed_array")
fixed_array_group.create_dataset("int8_unpaged", data=data, dtype='i1', chunks=(2, 3))
fixed_array_group.create_dataset("int8_two_page", data=two_page_data, dtype='i1', chunks=(1, 1))
fixed_array_group.create_dataset("int8_five_page", data=five_page_data, dtype='i1', chunks=(1, 1))

filtered_fixed_array_group = f.create_group("filtered_fixed_array")
filtered_fixed_array_group.create_dataset("int8_unpaged", data=data, dtype='i1', chunks=(2, 3), compression="gzip")
filtered_fixed_array_group.create_dataset("int8_two_page", data=two_page_data, dtype='i1', chunks=(1, 1), compression="gzip")
filtered_fixed_array_group.create_dataset("int8_five_page", data=five_page_data, dtype='i1', chunks=(1, 1), compression="gzip")

f.flush()
f.close()


if __name__ == '__main__':
print('Making chunked v4 dataset test files...')

f = h5py.File('fixed_array_paged_datasets.hdf5', 'w', libver='latest')
write_chunked_datasets(f)
print('fixed_array_paged_datasets.hdf5')

0 comments on commit d733f4d

Please sign in to comment.