Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fixed array paging support #622

Merged
merged 15 commits into from
Nov 25, 2024
Merged
2 changes: 1 addition & 1 deletion jhdf/src/main/java/io/jhdf/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public static long readBytesAsUnsignedLong(ByteBuffer buffer, int length) {
case 8:
long value = buffer.getLong();
if (value < 0 && value != Constants.UNDEFINED_ADDRESS) {
throw new ArithmeticException("Could not convert to unsigned");
throw new ArithmeticException("Could not convert to unsigned value: " + value);
}
return value;
default:
Expand Down
19 changes: 19 additions & 0 deletions jhdf/src/main/java/io/jhdf/checksum/ChecksumUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,26 @@ public static void validateChecksum(ByteBuffer buffer) {
if (calculatedChecksum != storedChecksum) {
throw new HdfChecksumMismatchException(storedChecksum, calculatedChecksum);
}
}

/**
* Checks the next 4 bytes of the buffer are the the Jenkins Lookup 3 Checksum of the bytes between the mark and the
* current position. The buffer position and mark are moved to after the checksum.
*
* @param buffer the buffer to check
* @throws HdfChecksumMismatchException if the checksum is incorrect.
*/
public static void validateChecksumFromMark(ByteBuffer buffer) {
int position = buffer.position();
int end = buffer.reset().position(); // Move the buffer to the mark and get the new position
byte[] bytes = new byte[position - end];
buffer.get(bytes);
int calculatedChecksum = checksum(bytes);
int storedChecksum = buffer.getInt();
if (calculatedChecksum != storedChecksum) {
throw new HdfChecksumMismatchException(storedChecksum, calculatedChecksum);
}
buffer.mark();
}

public static int checksum(ByteBuffer buffer) {
Expand Down
152 changes: 120 additions & 32 deletions jhdf/src/main/java/io/jhdf/dataset/chunked/indexing/FixedArrayIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
import io.jhdf.dataset.chunked.DatasetInfo;
import io.jhdf.exceptions.HdfException;
import io.jhdf.storage.HdfBackingStorage;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
Expand All @@ -29,6 +33,8 @@ public class FixedArrayIndex implements ChunkIndex {
private static final byte[] FIXED_ARRAY_HEADER_SIGNATURE = "FAHD".getBytes(StandardCharsets.US_ASCII);
private static final byte[] FIXED_ARRAY_DATA_BLOCK_SIGNATURE = "FADB".getBytes(StandardCharsets.US_ASCII);

private static final Logger logger = LoggerFactory.getLogger(FixedArrayIndex.class);

private final long address;
private final int unfilteredChunkSize;

Expand All @@ -40,8 +46,10 @@ public class FixedArrayIndex implements ChunkIndex {
private final int pageBits;
private final int maxNumberOfEntries;
private final long dataBlockAddress;
private final int pages;
private final int pageSize;

private final List<Chunk> chunks;
private final FixedArrayDataBlockInitializer dataBlockInitializer;

public FixedArrayIndex(HdfBackingStorage hdfBackingStorage, long address, DatasetInfo datasetInfo) {
this.address = address;
Expand All @@ -51,6 +59,7 @@ public FixedArrayIndex(HdfBackingStorage hdfBackingStorage, long address, Datase

final int headerSize = 12 + hdfBackingStorage.getSizeOfOffsets() + hdfBackingStorage.getSizeOfLengths();
final ByteBuffer bb = hdfBackingStorage.readBufferFromAddress(address, headerSize);
bb.mark();

byte[] formatSignatureBytes = new byte[4];
bb.get(formatSignatureBytes, 0, formatSignatureBytes.length);
Expand All @@ -71,25 +80,47 @@ public FixedArrayIndex(HdfBackingStorage hdfBackingStorage, long address, Datase
pageBits = bb.get();

maxNumberOfEntries = Utils.readBytesAsUnsignedInt(bb, hdfBackingStorage.getSizeOfLengths());
dataBlockAddress = Utils.readBytesAsUnsignedLong(bb, hdfBackingStorage.getSizeOfOffsets());
pageSize = 1 << pageBits;
pages = (maxNumberOfEntries + pageSize -1) / pageSize;

chunks = new ArrayList<>(maxNumberOfEntries);
dataBlockAddress = Utils.readBytesAsUnsignedLong(bb, hdfBackingStorage.getSizeOfOffsets());

// Checksum
bb.rewind();
ChecksumUtils.validateChecksum(bb);
ChecksumUtils.validateChecksumFromMark(bb);

// Building the object fills the chunks. Probably shoudld be changed
new FixedArrayDataBlock(this, hdfBackingStorage, dataBlockAddress);
dataBlockInitializer = new FixedArrayDataBlockInitializer(hdfBackingStorage);
logger.info("Read fixed array index header. pages=[{}], maxEntries=[{}]", pages, maxNumberOfEntries);
}

private static class FixedArrayDataBlock {
private class FixedArrayDataBlockInitializer extends LazyInitializer<FixedArrayDataBlock> {

private final HdfBackingStorage hdfBackingStorage;

private FixedArrayDataBlock(FixedArrayIndex fixedArrayIndex, HdfBackingStorage hdfBackingStorage, long address) {
public FixedArrayDataBlockInitializer(HdfBackingStorage hdfBackingStorage) {
this.hdfBackingStorage = hdfBackingStorage;
}

@Override
protected FixedArrayDataBlock initialize() {
logger.info("Initializing data block");
return new FixedArrayDataBlock(hdfBackingStorage, FixedArrayIndex.this.dataBlockAddress);
}

// TODO header size ignoring paging
final int headerSize = 6 + hdfBackingStorage.getSizeOfOffsets() + fixedArrayIndex.entrySize * fixedArrayIndex.maxNumberOfEntries + 4;
}

private class FixedArrayDataBlock {

private final List<Chunk> chunks = new ArrayList<>(maxNumberOfEntries);

private FixedArrayDataBlock(HdfBackingStorage hdfBackingStorage, long address) {

int pageBitmapBytes = (pages + 7) / 8;
int headerSize = 6 + hdfBackingStorage.getSizeOfOffsets() + FixedArrayIndex.this.entrySize * FixedArrayIndex.this.maxNumberOfEntries + 4;
if(pages > 1) {
headerSize += pageBitmapBytes + (4 * pages);
}
final ByteBuffer bb = hdfBackingStorage.readBufferFromAddress(address, headerSize);
bb.mark();

byte[] formatSignatureBytes = new byte[4];
bb.get(formatSignatureBytes, 0, formatSignatureBytes.length);
Expand All @@ -105,44 +136,101 @@ private FixedArrayDataBlock(FixedArrayIndex fixedArrayIndex, HdfBackingStorage h
throw new HdfException("Unsupported fixed array data block version detected. Version: " + version);
}

final int clientId = bb.get();
if (clientId != fixedArrayIndex.clientId) {
final int dataBlockclientId = bb.get();
if (dataBlockclientId != FixedArrayIndex.this.clientId) {
throw new HdfException("Fixed array client ID mismatch. Possible file corruption detected");
}

final long headerAddress = Utils.readBytesAsUnsignedLong(bb, hdfBackingStorage.getSizeOfOffsets());
if (headerAddress != fixedArrayIndex.address) {
if (headerAddress != FixedArrayIndex.this.address) {
throw new HdfException("Fixed array data block header address missmatch");
}

// TODO ignoring paging here might need to revisit
if(pages > 1) {
readPaged(hdfBackingStorage, pageBitmapBytes, bb, dataBlockclientId);
} else {
// Unpaged
logger.info("Reading unpaged");
if (dataBlockclientId == 0) { // Not filtered
for (int i = 0; i < FixedArrayIndex.this.maxNumberOfEntries; i++) {
readUnfiltered(hdfBackingStorage.getSizeOfOffsets(), bb, i);
}
} else if (dataBlockclientId == 1) { // Filtered
for (int i = 0; i < FixedArrayIndex.this.maxNumberOfEntries; i++) {
readFiltered(hdfBackingStorage, bb, i);
}
} else {
throw new HdfException("Unrecognized client ID = " + dataBlockclientId);
}

if (clientId == 0) { // Not filtered
for (int i = 0; i < fixedArrayIndex.maxNumberOfEntries; i++) {
final long chunkAddress = Utils.readBytesAsUnsignedLong(bb, hdfBackingStorage.getSizeOfOffsets());
final int[] chunkOffset = Utils.chunkIndexToChunkOffset(i, fixedArrayIndex.chunkDimensions, fixedArrayIndex.datasetDimensions);
fixedArrayIndex.chunks.add(new ChunkImpl(chunkAddress, fixedArrayIndex.unfilteredChunkSize, chunkOffset));
ChecksumUtils.validateChecksumFromMark(bb);
}
}

private void readPaged(HdfBackingStorage hdfBackingStorage, int pageBitmapBytes, ByteBuffer bb, int dataBlockclientId) {
logger.info("Reading paged");
byte[] pageBitmap = new byte[pageBitmapBytes];
bb.get(pageBitmap);

ChecksumUtils.validateChecksumFromMark(bb);

int chunkIndex = 0;
for(int page = 0; page < pages; page++) {
final int currentPageSize = getCurrentPageSize(page);

if (dataBlockclientId == 0) { // Not filtered
for (int i = 0; i < currentPageSize; i++) {
readUnfiltered(hdfBackingStorage.getSizeOfOffsets(), bb, chunkIndex++);
}
} else if (dataBlockclientId == 1) { // Filtered
for (int i = 0; i < currentPageSize; i++) {
readFiltered(hdfBackingStorage, bb, chunkIndex++);
}
} else {
throw new HdfException("Unrecognized client ID = " + dataBlockclientId);
}
} else if (clientId == 1) { // Filtered
for (int i = 0; i < fixedArrayIndex.maxNumberOfEntries; i++) {
final long chunkAddress = Utils.readBytesAsUnsignedLong(bb, hdfBackingStorage.getSizeOfOffsets());
final int chunkSizeInBytes = Utils.readBytesAsUnsignedInt(bb, fixedArrayIndex.entrySize - hdfBackingStorage.getSizeOfOffsets() - 4);
final BitSet filterMask = BitSet.valueOf(new byte[]{bb.get(), bb.get(), bb.get(), bb.get()});
final int[] chunkOffset = Utils.chunkIndexToChunkOffset(i, fixedArrayIndex.chunkDimensions, fixedArrayIndex.datasetDimensions);

fixedArrayIndex.chunks.add(new ChunkImpl(chunkAddress, chunkSizeInBytes, chunkOffset, filterMask));
ChecksumUtils.validateChecksumFromMark(bb);
}
}

private int getCurrentPageSize(int page) {
final int currentPageSize;
if(page == pages -1) {
// last page so maybe not a full page
int lastPageSize = FixedArrayIndex.this.maxNumberOfEntries % FixedArrayIndex.this.pageSize;
if(lastPageSize == 0) {
currentPageSize = FixedArrayIndex.this.pageSize;
} else {
currentPageSize = lastPageSize;
}
} else {
throw new HdfException("Unrecognized client ID = " + clientId);
currentPageSize = FixedArrayIndex.this.pageSize;
}
return currentPageSize;
}

private void readFiltered(HdfBackingStorage hdfBackingStorage, ByteBuffer bb, int i) {
final long chunkAddress = Utils.readBytesAsUnsignedLong(bb, hdfBackingStorage.getSizeOfOffsets());
final int chunkSizeInBytes = Utils.readBytesAsUnsignedInt(bb, FixedArrayIndex.this.entrySize - hdfBackingStorage.getSizeOfOffsets() - 4);
final BitSet filterMask = BitSet.valueOf(new byte[]{bb.get(), bb.get(), bb.get(), bb.get()});
final int[] chunkOffset = Utils.chunkIndexToChunkOffset(i, FixedArrayIndex.this.chunkDimensions, FixedArrayIndex.this.datasetDimensions);

bb.rewind();
ChecksumUtils.validateChecksum(bb);
chunks.add(new ChunkImpl(chunkAddress, chunkSizeInBytes, chunkOffset, filterMask));
}

private void readUnfiltered(int sizeOfOffsets, ByteBuffer bb, int chunkIndex) {
final long chunkAddress = Utils.readBytesAsUnsignedLong(bb, sizeOfOffsets);
final int[] chunkOffset = Utils.chunkIndexToChunkOffset(chunkIndex, FixedArrayIndex.this.chunkDimensions, FixedArrayIndex.this.datasetDimensions);
chunks.add(new ChunkImpl(chunkAddress, FixedArrayIndex.this.unfilteredChunkSize, chunkOffset));
}
}

@Override
public Collection<Chunk> getAllChunks() {
return chunks;
try {
return this.dataBlockInitializer.get().chunks;
} catch (ConcurrentException e) {
throw new HdfException("Error initializing data block", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* This file is part of jHDF. A pure Java library for accessing HDF5 files.
*
* https://jhdf.io
*
* Copyright (c) 2024 James Mudd
*
* MIT License see 'LICENSE' file
*/
package io.jhdf.dataset.chunked.indexing;

import io.jhdf.HdfFile;
import io.jhdf.api.Dataset;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import static io.jhdf.TestUtils.loadTestHdfFile;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

class FixedArrayIndexTest {

private static final String HDF5_TEST_FILE_NAME = "fixed_array_paged_datasets.hdf5";

private static HdfFile hdfFile;

@BeforeAll
static void setup() throws Exception {
hdfFile = loadTestHdfFile(HDF5_TEST_FILE_NAME);
}

@AfterAll
static void tearDown() {
hdfFile.close();
}

@Test
void testDataReadCorrectly() {
// Unfiltered
Dataset int8Unpaged = hdfFile.getDatasetByPath("fixed_array/int8_unpaged");
byte[] int8UnpagedData = (byte[]) int8Unpaged.getDataFlat();
assertThat(int8UnpagedData).isNotEqualTo(expectedData(Math.toIntExact(int8Unpaged.getSize())));

Dataset int8TwoPage = hdfFile.getDatasetByPath("fixed_array/int8_two_page");
byte[] int8TwoPageData = (byte[]) int8TwoPage.getDataFlat();
assertThat(int8TwoPageData).isNotEqualTo(expectedData(Math.toIntExact(int8TwoPage.getSize())));

Dataset int8FivePage = hdfFile.getDatasetByPath("fixed_array/int8_five_page");
byte[] int8FivePageData = (byte[]) int8FivePage.getDataFlat();
assertThat(int8FivePageData).isNotEqualTo(expectedData(Math.toIntExact(int8FivePage.getSize())));

// Filtered
Dataset int8UnpagedFiltered = hdfFile.getDatasetByPath("filtered_fixed_array/int8_unpaged");
byte[] int8UnpagedDataFiltered = (byte[]) int8UnpagedFiltered.getDataFlat();
assertThat(int8UnpagedDataFiltered).isNotEqualTo(expectedData(Math.toIntExact(int8UnpagedFiltered.getSize())));

Dataset int8TwoPageFiltered = hdfFile.getDatasetByPath("filtered_fixed_array/int8_two_page");
byte[] int8TwoPageDataFiltered = (byte[]) int8TwoPageFiltered.getDataFlat();
assertThat(int8TwoPageDataFiltered).isNotEqualTo(expectedData(Math.toIntExact(int8TwoPageFiltered.getSize())));

Dataset int8FivePageFiltered = hdfFile.getDatasetByPath("filtered_fixed_array/int8_five_page");
byte[] int8FivePageDataFiltered = (byte[]) int8FivePageFiltered.getDataFlat();
assertThat(int8FivePageDataFiltered).isNotEqualTo(expectedData(Math.toIntExact(int8FivePageFiltered.getSize())));
}

private byte[] expectedData(int length) {
byte[] bytes = new byte[length];
for (int i = 0; i < length; i++) {
bytes[i] = (byte) i;
}
return bytes;
}
}
2 changes: 1 addition & 1 deletion jhdf/src/test/java/io/jhdf/writing/StringWritingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ void writeStrings() throws Exception {
assertThat(datasets).hasSize(4);

String[] proseReadBackArray = (String[]) hdfFile.getDatasetByPath("prose").getData();
String proseReadback = StringUtils.joinWith(" ", proseReadBackArray);
String proseReadback = StringUtils.joinWith(" ", (Object[]) proseReadBackArray);
assertThat(proseReadback).isEqualTo(prose);

// Just check thw whole file is readable
Expand Down
Binary file not shown.
42 changes: 42 additions & 0 deletions jhdf/src/test/resources/scripts/fixed_array_paged.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# -------------------------------------------------------------------------------
# This file is part of jHDF. A pure Java library for accessing HDF5 files.
#
# https://jhdf.io
#
# Copyright (c) 2024 James Mudd
#
# MIT License see 'LICENSE' file
# -------------------------------------------------------------------------------
import h5py

import numpy as np


def write_chunked_datasets(f):
# Less than 1025 element should be unpaged
data = np.arange(1000).reshape(10, 100)
# 1024 elements per page
two_page_data = np.arange(2048).reshape(128, 16)
five_page_data = np.arange(5000).reshape(200, 25)

# Fixed Array Index - Fixed maximum dimension sizes. Index type 3
fixed_array_group = f.create_group("fixed_array")
fixed_array_group.create_dataset("int8_unpaged", data=data, dtype='i1', chunks=(2, 3))
fixed_array_group.create_dataset("int8_two_page", data=two_page_data, dtype='i1', chunks=(1, 1))
fixed_array_group.create_dataset("int8_five_page", data=five_page_data, dtype='i1', chunks=(1, 1))

filtered_fixed_array_group = f.create_group("filtered_fixed_array")
filtered_fixed_array_group.create_dataset("int8_unpaged", data=data, dtype='i1', chunks=(2, 3), compression="gzip")
filtered_fixed_array_group.create_dataset("int8_two_page", data=two_page_data, dtype='i1', chunks=(1, 1), compression="gzip")
filtered_fixed_array_group.create_dataset("int8_five_page", data=five_page_data, dtype='i1', chunks=(1, 1), compression="gzip")

f.flush()
f.close()


if __name__ == '__main__':
print('Making chunked v4 dataset test files...')

f = h5py.File('fixed_array_paged_datasets.hdf5', 'w', libver='latest')
write_chunked_datasets(f)
print('fixed_array_paged_datasets.hdf5')
Loading