Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ntuple] remove kNTupleUnknownCompression #17389

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tree/ntuple/v7/inc/ROOT/RNTupleDescriptor.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ public:
NTupleSize_t fNElements = kInvalidNTupleIndex;
/// The usual format for ROOT compression settings (see Compression.h).
/// The pages of a particular column in a particular cluster are all compressed with the same settings.
int fCompressionSettings = kNTupleUnknownCompression;
/// If unset, the compression settings are undefined (deferred columns, suppressed columns).
std::optional<std::uint32_t> fCompressionSettings;
pcanal marked this conversation as resolved.
Show resolved Hide resolved
/// Suppressed columns have an empty page range and unknown compression settings.
/// Their element index range, however, is aligned with the corresponding column of the
/// primary column representation (see Section "Suppressed Columns" in the specification)
Expand Down
4 changes: 2 additions & 2 deletions tree/ntuple/v7/inc/ROOT/RNTupleMerger.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ struct RSealedPageMergeData;
class RClusterPool;

struct RNTupleMergeOptions {
/// If `fCompressionSettings == kNTupleUnknownCompression` (the default), the merger will not change the
/// If fCompressionSettings is empty (the default), the merger will not change the
/// compression of any of its sources (fast merging). Otherwise, all sources will be converted to the specified
/// compression algorithm and level.
int fCompressionSettings = kNTupleUnknownCompression;
std::optional<std::uint32_t> fCompressionSettings;
/// Determines how the merging treats sources with different models (\see ENTupleMergingMode).
ENTupleMergingMode fMergingMode = ENTupleMergingMode::kFilter;
/// Determines how the Merge function behaves upon merging errors
Expand Down
3 changes: 0 additions & 3 deletions tree/ntuple/v7/inc/ROOT/RNTupleUtil.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,6 @@ enum ENTupleStructure : std::uint16_t { kInvalid, kLeaf, kCollection, kRecord, k
using NTupleSize_t = std::uint64_t;
constexpr NTupleSize_t kInvalidNTupleIndex = std::uint64_t(-1);

/// Regular, known compression settings have the form algorithm * 100 + level, e.g. 101, 505, ...
constexpr int kNTupleUnknownCompression = -1;

/// Distriniguishes elements of the same type within a descriptor, e.g. different fields
using DescriptorId_t = std::uint64_t;
constexpr DescriptorId_t kInvalidDescriptorId = std::uint64_t(-1);
Expand Down
6 changes: 3 additions & 3 deletions tree/ntuple/v7/inc/ROOT/RNTupleWriteOptions.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public:
// clang-format on

protected:
int fCompression{RCompressionSetting::EDefaults::kUseGeneralPurpose};
std::uint32_t fCompression{RCompressionSetting::EDefaults::kUseGeneralPurpose};
/// Approximation of the target compressed cluster size
std::size_t fApproxZippedClusterSize = 128 * 1024 * 1024;
/// Memory limit for committing a cluster: with very high compression ratio, we need a limit
Expand Down Expand Up @@ -103,8 +103,8 @@ public:
virtual ~RNTupleWriteOptions() = default;
virtual std::unique_ptr<RNTupleWriteOptions> Clone() const;

int GetCompression() const { return fCompression; }
void SetCompression(int val) { fCompression = val; }
std::uint32_t GetCompression() const { return fCompression; }
void SetCompression(std::uint32_t val) { fCompression = val; }
void SetCompression(RCompressionSetting::EAlgorithm::EValues algorithm, int compressionLevel)
{
fCompression = CompressionSettings(algorithm, compressionLevel);
Expand Down
4 changes: 1 addition & 3 deletions tree/ntuple/v7/src/RNTupleDescriptor.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -711,8 +711,7 @@ ROOT::RResult<void> ROOT::Experimental::Internal::RClusterDescriptorBuilder::Com
return R__FAIL("column ID mismatch");
if (fCluster.fColumnRanges.count(physicalId) > 0)
return R__FAIL("column ID conflict");
RClusterDescriptor::RColumnRange columnRange{physicalId, firstElementIndex, 0};
columnRange.fCompressionSettings = compressionSettings;
RClusterDescriptor::RColumnRange columnRange{physicalId, firstElementIndex, 0, compressionSettings};
for (const auto &pi : pageRange.fPageInfos) {
columnRange.fNElements += pi.fNElements;
}
Expand All @@ -729,7 +728,6 @@ ROOT::Experimental::Internal::RClusterDescriptorBuilder::MarkSuppressedColumnRan

RClusterDescriptor::RColumnRange columnRange;
columnRange.fPhysicalColumnId = physicalId;
columnRange.fCompressionSettings = kNTupleUnknownCompression;
columnRange.fIsSuppressed = true;
fCluster.fColumnRanges[physicalId] = columnRange;
return RResult<void>::Success();
Expand Down
2 changes: 1 addition & 1 deletion tree/ntuple/v7/src/RNTupleDescriptorFmt.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ void ROOT::Experimental::RNTupleDescriptor::PrintInfo(std::ostream &output) cons

info.fNElements += columnRange.fNElements;
if (compression == -1) {
compression = columnRange.fCompressionSettings;
compression = columnRange.fCompressionSettings.value();
}
const auto &pageRange = cluster.second.GetPageRange(column.second.GetPhysicalId());
auto idx = cluster2Idx[cluster.first];
Expand Down
31 changes: 16 additions & 15 deletions tree/ntuple/v7/src/RNTupleMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ try {
"RNTuple::Merge",
"Passed both options \"default_compression\" and \"first_source_compression\": only the latter will apply.");
}
int compression = kNTupleUnknownCompression;
std::optional<std::uint32_t> compression;
if (firstSrcComp) {
// user passed -ff or -fk: use the same compression as the first RNTuple we find in the sources.
// (do nothing here, the compression will be fetched below)
Expand All @@ -96,7 +96,7 @@ try {
} else {
// user passed no compression-related options: use default
compression = RCompressionSetting::EDefaults::kUseGeneralPurpose;
Info("RNTuple::Merge", "Using the default compression: %d", compression);
Info("RNTuple::Merge", "Using the default compression: %u", *compression);
}

// The remaining entries are the input files
Expand All @@ -113,7 +113,7 @@ try {
}

auto source = RPageSourceFile::CreateFromAnchor(*anchor);
if (compression == kNTupleUnknownCompression) {
if (!compression) {
// Get the compression of this RNTuple and use it as the output compression.
// We currently assume all column ranges have the same compression, so we just peek at the first one.
source->Attach();
Expand All @@ -138,15 +138,15 @@ try {
inFile->GetName());
return -1;
}
compression = (*firstColRange).fCompressionSettings;
Info("RNTuple::Merge", "Using the first RNTuple's compression: %d", compression);
compression = (*firstColRange).fCompressionSettings.value();
Info("RNTuple::Merge", "Using the first RNTuple's compression: %u", *compression);
}
sources.push_back(std::move(source));
}

RNTupleWriteOptions writeOpts;
assert(compression != kNTupleUnknownCompression);
writeOpts.SetCompression(compression);
assert(compression);
writeOpts.SetCompression(*compression);
auto destination = std::make_unique<RPageSinkFile>(ntupleName, *outFile, writeOpts);

// If we already have an existing RNTuple, copy over its descriptor to support incremental merging
Expand All @@ -166,7 +166,7 @@ try {
// Now merge
RNTupleMerger merger;
RNTupleMergeOptions mergerOpts;
mergerOpts.fCompressionSettings = compression;
mergerOpts.fCompressionSettings = *compression;
merger.Merge(sourcePtrs, *destination, mergerOpts).ThrowOnError();

// Provide the caller with a merged anchor object (even though we've already
Expand Down Expand Up @@ -197,7 +197,7 @@ struct RChangeCompressionFunc {
sealConf.fElement = &fDstColElement;
sealConf.fPage = &page;
sealConf.fBuffer = fBuffer;
sealConf.fCompressionSetting = fMergeOptions.fCompressionSettings;
sealConf.fCompressionSetting = fMergeOptions.fCompressionSettings.value();
sealConf.fWriteChecksum = fSealedPage.GetHasChecksum();
auto refSealedPage = RPageSink::SealPage(sealConf);
fSealedPage = refSealedPage;
Expand Down Expand Up @@ -553,11 +553,12 @@ void RNTupleMerger::MergeCommonColumns(RClusterPool &clusterPool, DescriptorId_t
sealedPages.resize(pages.fPageInfos.size());

// Each column range potentially has a distinct compression settings
const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).fCompressionSettings;
const bool needsCompressionChange = colRangeCompressionSettings != mergeData.fMergeOpts.fCompressionSettings;
const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).fCompressionSettings.value();
const bool needsCompressionChange =
colRangeCompressionSettings != mergeData.fMergeOpts.fCompressionSettings.value();
if (needsCompressionChange && mergeData.fMergeOpts.fExtraVerbose)
Info("RNTuple::Merge", "Column %s: changing source compression from %d to %d", column.fColumnName.c_str(),
colRangeCompressionSettings, mergeData.fMergeOpts.fCompressionSettings);
colRangeCompressionSettings, mergeData.fMergeOpts.fCompressionSettings.value());

size_t pageBufferBaseIdx = sealedPageData.fBuffers.size();
// If the column range already has the right compression we don't need to allocate any new buffer, so we don't
Expand Down Expand Up @@ -886,12 +887,12 @@ RNTupleMerger::Merge(std::span<RPageSource *> sources, RPageSink &destination, c
RNTupleMergeOptions mergeOpts = mergeOptsIn;
{
const auto dstCompSettings = destination.GetWriteOptions().GetCompression();
if (mergeOpts.fCompressionSettings == kNTupleUnknownCompression) {
if (!mergeOpts.fCompressionSettings) {
mergeOpts.fCompressionSettings = dstCompSettings;
} else if (mergeOpts.fCompressionSettings != dstCompSettings) {
} else if (*mergeOpts.fCompressionSettings != dstCompSettings) {
return R__FAIL(std::string("The compression given to RNTupleMergeOptions is different from that of the "
"sink! (opts: ") +
std::to_string(mergeOpts.fCompressionSettings) + ", sink: " + std::to_string(dstCompSettings) +
std::to_string(*mergeOpts.fCompressionSettings) + ", sink: " + std::to_string(dstCompSettings) +
") This is currently unsupported.");
}
}
Expand Down
2 changes: 1 addition & 1 deletion tree/ntuple/v7/src/RNTupleSerialize.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1582,7 +1582,7 @@ ROOT::Experimental::Internal::RNTupleSerializer::SerializePageList(void *buffer,
pos += SerializeLocator(pi.fLocator, *where);
}
pos += SerializeInt64(columnRange.fFirstElementIndex, *where);
pos += SerializeUInt32(columnRange.fCompressionSettings, *where);
pos += SerializeUInt32(columnRange.fCompressionSettings.value(), *where);
}

pos += SerializeFramePostscript(buffer ? innerFrame : nullptr, pos - innerFrame);
Expand Down
7 changes: 4 additions & 3 deletions tree/ntuple/v7/src/RPageStorage.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -937,8 +937,8 @@ void ROOT::Experimental::Internal::RPagePersistentSink::InitFromDescriptor(const
R__ASSERT(columnRange.fPhysicalColumnId == i);
const auto &pageRange = cluster.GetPageRange(i);
R__ASSERT(pageRange.fPhysicalColumnId == i);
clusterBuilder.CommitColumnRange(i, fOpenColumnRanges[i].fFirstElementIndex, columnRange.fCompressionSettings,
pageRange);
clusterBuilder.CommitColumnRange(i, fOpenColumnRanges[i].fFirstElementIndex,
columnRange.fCompressionSettings.value(), pageRange);
fOpenColumnRanges[i].fFirstElementIndex += columnRange.fNElements;
}
fDescriptorBuilder.AddCluster(clusterBuilder.MoveDescriptor().Unwrap());
Expand Down Expand Up @@ -1106,7 +1106,8 @@ void ROOT::Experimental::Internal::RPagePersistentSink::CommitStagedClusters(std
clusterBuilder.MarkSuppressedColumnRange(colId);
} else {
clusterBuilder.CommitColumnRange(colId, fOpenColumnRanges[colId].fFirstElementIndex,
fOpenColumnRanges[colId].fCompressionSettings, columnInfo.fPageRange);
fOpenColumnRanges[colId].fCompressionSettings.value(),
columnInfo.fPageRange);
fOpenColumnRanges[colId].fFirstElementIndex += columnInfo.fNElements;
}
}
Expand Down
7 changes: 4 additions & 3 deletions tree/ntuple/v7/test/ntuple_merger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -798,13 +798,13 @@ TEST(RNTupleMerger, MergeThroughTBufferMerger)
EXPECT_EQ(reader->GetNEntries(), 10);
}

static bool VerifyPageCompression(const std::string_view fileName, int expectedComp)
static bool VerifyPageCompression(const std::string_view fileName, std::uint32_t expectedComp)
{
// Check that the advertised compression is correct
bool ok = true;
{
auto reader = RNTupleReader::Open("ntuple", fileName);
auto compSettings = reader->GetDescriptor().GetClusterDescriptor(0).GetColumnRange(0).fCompressionSettings;
auto compSettings = *reader->GetDescriptor().GetClusterDescriptor(0).GetColumnRange(0).fCompressionSettings;
if (compSettings != expectedComp) {
std::cerr << "Advertised compression is wrong: " << compSettings << " instead of " << expectedComp << "\n";
ok = false;
Expand All @@ -824,7 +824,8 @@ static bool VerifyPageCompression(const std::string_view fileName, int expectedC
source->LoadSealedPage(0, {0, 0}, sealedPage);

// size_t uncompSize = sealedPage.GetNElements() * colElement->GetSize();
int compAlgo = R__getCompressionAlgorithm((const unsigned char *)sealedPage.GetBuffer(), sealedPage.GetDataSize());
std::uint32_t compAlgo =
R__getCompressionAlgorithm((const unsigned char *)sealedPage.GetBuffer(), sealedPage.GetDataSize());
if (compAlgo == ROOT::RCompressionSetting::EAlgorithm::kUndefined)
compAlgo = 0;
if (compAlgo != (expectedComp / 100)) {
Expand Down
7 changes: 3 additions & 4 deletions tree/ntuple/v7/test/ntuple_multi_column.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

TEST(RNTuple, MultiColumnRepresentationSimple)
{
using ROOT::Experimental::kNTupleUnknownCompression;
FileRaii fileGuard("test_ntuple_multi_column_representation_simple.root");

{
Expand Down Expand Up @@ -53,7 +52,7 @@ TEST(RNTuple, MultiColumnRepresentationSimple)
EXPECT_TRUE(clusterDesc0.GetColumnRange(colDesc16.GetPhysicalId()).fIsSuppressed);
EXPECT_EQ(1u, clusterDesc0.GetColumnRange(colDesc16.GetPhysicalId()).fNElements);
EXPECT_EQ(0u, clusterDesc0.GetColumnRange(colDesc16.GetPhysicalId()).fFirstElementIndex);
EXPECT_EQ(kNTupleUnknownCompression, clusterDesc0.GetColumnRange(colDesc16.GetPhysicalId()).fCompressionSettings);
EXPECT_FALSE(clusterDesc0.GetColumnRange(colDesc16.GetPhysicalId()).fCompressionSettings);

const auto &clusterDesc1 = desc.GetClusterDescriptor(1);
EXPECT_FALSE(clusterDesc1.GetColumnRange(colDesc16.GetPhysicalId()).fIsSuppressed);
Expand All @@ -62,7 +61,7 @@ TEST(RNTuple, MultiColumnRepresentationSimple)
EXPECT_TRUE(clusterDesc1.GetColumnRange(colDesc32.GetPhysicalId()).fIsSuppressed);
EXPECT_EQ(1u, clusterDesc1.GetColumnRange(colDesc32.GetPhysicalId()).fNElements);
EXPECT_EQ(1u, clusterDesc1.GetColumnRange(colDesc32.GetPhysicalId()).fFirstElementIndex);
EXPECT_EQ(kNTupleUnknownCompression, clusterDesc1.GetColumnRange(colDesc32.GetPhysicalId()).fCompressionSettings);
EXPECT_FALSE(clusterDesc1.GetColumnRange(colDesc32.GetPhysicalId()).fCompressionSettings);

const auto &clusterDesc2 = desc.GetClusterDescriptor(2);
EXPECT_FALSE(clusterDesc2.GetColumnRange(colDesc32.GetPhysicalId()).fIsSuppressed);
Expand All @@ -71,7 +70,7 @@ TEST(RNTuple, MultiColumnRepresentationSimple)
EXPECT_TRUE(clusterDesc2.GetColumnRange(colDesc16.GetPhysicalId()).fIsSuppressed);
EXPECT_EQ(1u, clusterDesc2.GetColumnRange(colDesc16.GetPhysicalId()).fNElements);
EXPECT_EQ(2u, clusterDesc2.GetColumnRange(colDesc16.GetPhysicalId()).fFirstElementIndex);
EXPECT_EQ(kNTupleUnknownCompression, clusterDesc2.GetColumnRange(colDesc16.GetPhysicalId()).fCompressionSettings);
EXPECT_FALSE(clusterDesc2.GetColumnRange(colDesc16.GetPhysicalId()).fCompressionSettings);

auto ptrPx = reader->GetModel().GetDefaultEntry().GetPtr<float>("px");
reader->LoadEntry(0);
Expand Down
7 changes: 3 additions & 4 deletions tree/ntuple/v7/test/ntuple_parallel_writer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ TEST(RNTupleParallelWriter, Staged)
TEST(RNTupleParallelWriter, StagedMultiColumn)
{
// Based on MultiColumnRepresentationSimple from ntuple_multi_column.cxx
using ROOT::Experimental::kNTupleUnknownCompression;
FileRaii fileGuard("test_ntuple_parallel_staged_multi_column.root");

{
Expand Down Expand Up @@ -217,7 +216,7 @@ TEST(RNTupleParallelWriter, StagedMultiColumn)
EXPECT_TRUE(clusterDesc0.GetColumnRange(colDesc16.GetPhysicalId()).fIsSuppressed);
EXPECT_EQ(1u, clusterDesc0.GetColumnRange(colDesc16.GetPhysicalId()).fNElements);
EXPECT_EQ(0u, clusterDesc0.GetColumnRange(colDesc16.GetPhysicalId()).fFirstElementIndex);
EXPECT_EQ(kNTupleUnknownCompression, clusterDesc0.GetColumnRange(colDesc16.GetPhysicalId()).fCompressionSettings);
EXPECT_FALSE(clusterDesc0.GetColumnRange(colDesc16.GetPhysicalId()).fCompressionSettings);

const auto &clusterDesc1 = desc.GetClusterDescriptor(1);
EXPECT_FALSE(clusterDesc1.GetColumnRange(colDesc16.GetPhysicalId()).fIsSuppressed);
Expand All @@ -226,7 +225,7 @@ TEST(RNTupleParallelWriter, StagedMultiColumn)
EXPECT_TRUE(clusterDesc1.GetColumnRange(colDesc32.GetPhysicalId()).fIsSuppressed);
EXPECT_EQ(1u, clusterDesc1.GetColumnRange(colDesc32.GetPhysicalId()).fNElements);
EXPECT_EQ(1u, clusterDesc1.GetColumnRange(colDesc32.GetPhysicalId()).fFirstElementIndex);
EXPECT_EQ(kNTupleUnknownCompression, clusterDesc1.GetColumnRange(colDesc32.GetPhysicalId()).fCompressionSettings);
EXPECT_FALSE(clusterDesc1.GetColumnRange(colDesc32.GetPhysicalId()).fCompressionSettings);

const auto &clusterDesc2 = desc.GetClusterDescriptor(2);
EXPECT_FALSE(clusterDesc2.GetColumnRange(colDesc32.GetPhysicalId()).fIsSuppressed);
Expand All @@ -235,7 +234,7 @@ TEST(RNTupleParallelWriter, StagedMultiColumn)
EXPECT_TRUE(clusterDesc2.GetColumnRange(colDesc16.GetPhysicalId()).fIsSuppressed);
EXPECT_EQ(1u, clusterDesc2.GetColumnRange(colDesc16.GetPhysicalId()).fNElements);
EXPECT_EQ(2u, clusterDesc2.GetColumnRange(colDesc16.GetPhysicalId()).fFirstElementIndex);
EXPECT_EQ(kNTupleUnknownCompression, clusterDesc2.GetColumnRange(colDesc16.GetPhysicalId()).fCompressionSettings);
EXPECT_FALSE(clusterDesc2.GetColumnRange(colDesc16.GetPhysicalId()).fCompressionSettings);

auto ptrPx = reader->GetModel().GetDefaultEntry().GetPtr<float>("px");
reader->LoadEntry(0);
Expand Down
Loading
Loading