Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Harden against object lifetime on exceptions #1928

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,7 @@ if(${TEST})
util/test/test_storage_lock.cpp
util/test/test_string_pool.cpp
util/test/test_string_utils.cpp
util/test/random_throw.hpp
util/test/test_tracing_allocator.cpp
version/test/test_append.cpp
version/test/test_sparse.cpp
Expand All @@ -957,7 +958,8 @@ if(${TEST})
version/test/version_map_model.hpp
python/python_handlers.cpp
storage/test/common.hpp
version/test/test_sort_index.cpp)
version/test/test_sort_index.cpp
)

set(EXECUTABLE_PERMS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE) # 755

Expand Down
21 changes: 5 additions & 16 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <arcticdb/processing/processing_unit.hpp>
#include <arcticdb/util/constructors.hpp>
#include <arcticdb/codec/codec.hpp>
#include <arcticdb/util/test/random_throw.hpp>

#include <type_traits>
#include <ranges>
Expand Down Expand Up @@ -90,6 +91,7 @@ struct EncodeAtomTask : BaseTask {

storage::KeySegmentPair encode() {
ARCTICDB_DEBUG(log::codec(), "Encoding object with partial key {}", partial_key_);
ARCTICDB_DEBUG_THROW(5)
auto enc_seg = ::arcticdb::encode_dispatch(std::move(segment_), *codec_meta_, encoding_version_);
auto content_hash = get_segment_hash(enc_seg);

Expand Down Expand Up @@ -128,6 +130,7 @@ struct EncodeSegmentTask : BaseTask {

storage::KeySegmentPair operator()() {
ARCTICDB_SAMPLE(EncodeSegmentTask, 0)
ARCTICDB_DEBUG_THROW(5)
return encode();
}
};
Expand Down Expand Up @@ -484,6 +487,7 @@ struct MemSegmentProcessingTask : BaseTask {
ARCTICDB_MOVE_ONLY_DEFAULT(MemSegmentProcessingTask)

std::vector<EntityId> operator()() {
ARCTICDB_DEBUG_THROW(5)
for (auto it = clauses_.cbegin(); it != clauses_.cend(); ++it) {
entity_ids_ = (*it)->process(std::move(entity_ids_));

Expand All @@ -496,22 +500,6 @@ struct MemSegmentProcessingTask : BaseTask {

};

struct MemSegmentFunctionTask : BaseTask {
stream::StreamSource::DecodeContinuation func_;

explicit MemSegmentFunctionTask(
stream::StreamSource::DecodeContinuation&& func) :
func_(std::move(func)) {
}

ARCTICDB_MOVE_ONLY_DEFAULT(MemSegmentFunctionTask)

folly::Unit operator()(std::pair<VariantKey, SegmentInMemory> &&seg_pair) {
func_(std::move(seg_pair.second));
return folly::Unit{};
}
};

struct DecodeMetadataTask : BaseTask {
ARCTICDB_MOVE_ONLY_DEFAULT(DecodeMetadataTask)

Expand Down Expand Up @@ -558,6 +546,7 @@ struct DecodeMetadataAndDescriptorTask : BaseTask {

std::tuple<VariantKey, std::optional<google::protobuf::Any>, StreamDescriptor> operator()(storage::KeySegmentPair &&ks) const {
ARCTICDB_SAMPLE(ReadMetadataAndDescriptorTask, 0)
ARCTICDB_DEBUG_THROW(5)
auto key_seg = std::move(ks);
ARCTICDB_DEBUG(log::storage(), "DecodeMetadataAndDescriptorTask decoding segment with key {}", variant_key_view(key_seg.variant_key()));

Expand Down
10 changes: 9 additions & 1 deletion cpp/arcticdb/processing/clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

#include <arcticdb/processing/clause.hpp>
#include <arcticdb/pipeline/column_stats.hpp>
#include <arcticdb/pipeline/value_set.hpp>
#include <arcticdb/pipeline/frame_slice.hpp>
#include <arcticdb/stream/segment_aggregator.hpp>
#include <arcticdb/util/test/random_throw.hpp>
#include <ankerl/unordered_dense.h>

namespace arcticdb {
Expand Down Expand Up @@ -56,6 +56,7 @@ class GroupingMap {

template<typename T>
std::shared_ptr<ankerl::unordered_dense::map<T, size_t>> get() {
ARCTICDB_DEBUG_THROW(5)
return util::variant_match(map_,
[that = this](const std::monostate &) {
that->map_ = std::make_shared<ankerl::unordered_dense::map<T, size_t>>();
Expand Down Expand Up @@ -133,6 +134,7 @@ std::string FilterClause::to_string() const {
}

std::vector<EntityId> ProjectClause::process(std::vector<EntityId>&& entity_ids) const {
ARCTICDB_DEBUG_THROW(5)
if (entity_ids.empty()) {
return {};
}
Expand Down Expand Up @@ -171,6 +173,8 @@ std::vector<EntityId> ProjectClause::process(std::vector<EntityId>&& entity_ids)
AggregationClause::AggregationClause(const std::string& grouping_column,
const std::vector<NamedAggregator>& named_aggregators):
grouping_column_(grouping_column) {
ARCTICDB_DEBUG_THROW(5)

clause_info_.input_structure_ = ProcessingStructure::HASH_BUCKETED;
clause_info_.can_combine_with_column_selection_ = false;
clause_info_.index_ = NewIndex(grouping_column_);
Expand Down Expand Up @@ -219,6 +223,7 @@ std::vector<std::vector<EntityId>> AggregationClause::structure_for_processing(s
for (auto& res_element: res) {
res_element.reserve(entity_ids_vec.size());
}
ARCTICDB_DEBUG_THROW(5)
// Experimentation shows flattening the entities into a single vector and a single call to
// component_manager_->get is faster than not flattening and making multiple calls
auto entity_ids = flatten_entities(std::move(entity_ids_vec));
Expand All @@ -232,6 +237,7 @@ std::vector<std::vector<EntityId>> AggregationClause::structure_for_processing(s
}

std::vector<EntityId> AggregationClause::process(std::vector<EntityId>&& entity_ids) const {
ARCTICDB_DEBUG_THROW(5)
if (entity_ids.empty()) {
return {};
}
Expand Down Expand Up @@ -590,6 +596,8 @@ std::vector<EntityId> ResampleClause<closed_boundary>::process(std::vector<Entit
seg.add_column(scalar_field(DataType::NANOSECONDS_UTC64, index_column_name), output_index_column);
seg.descriptor().set_index(IndexDescriptorImpl(1, IndexDescriptor::Type::TIMESTAMP));
auto& string_pool = seg.string_pool();

ARCTICDB_DEBUG_THROW(5)
for (const auto& aggregator: aggregators_) {
std::vector<std::optional<ColumnWithStrings>> input_agg_columns;
input_agg_columns.reserve(row_slices.size());
Expand Down
4 changes: 3 additions & 1 deletion cpp/arcticdb/storage/lmdb/lmdb_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <arcticdb/entity/serialized_key.hpp>
#include <arcticdb/storage/storage.hpp>
#include <arcticdb/storage/storage_options.hpp>
#include <arcticdb/util/test/random_throw.hpp>

#include <folly/gen/Base.h>

Expand Down Expand Up @@ -181,7 +182,7 @@ bool LmdbStorage::do_key_exists(const VariantKey&key) {
ARCTICDB_SAMPLE(LmdbStorageKeyExists, 0)
auto txn = ::lmdb::txn::begin(env(), nullptr, MDB_RDONLY);
ARCTICDB_SUBSAMPLE(LmdbStorageInTransaction, 0)

ARCTICDB_DEBUG_THROW(5)
auto db_name = fmt::format("{}", variant_key_type(key));
ARCTICDB_SUBSAMPLE(LmdbStorageOpenDb, 0)
auto stored_key = to_serialized_key(key);
Expand All @@ -201,6 +202,7 @@ std::vector<VariantKey> LmdbStorage::do_remove_internal(Composite<VariantKey>&&
auto fmt_db = [](auto &&k) { return variant_key_type(k); };
std::vector<VariantKey> failed_deletes;

ARCTICDB_DEBUG_THROW(5)
(fg::from(ks.as_range()) | fg::move | fg::groupBy(fmt_db)).foreach([&](auto &&group) {
auto db_name = fmt::format("{}", group.key());
ARCTICDB_SUBSAMPLE(LmdbStorageOpenDb, 0)
Expand Down
22 changes: 22 additions & 0 deletions cpp/arcticdb/util/test/random_throw.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* Copyright 2024 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/
#pragma once

#include <stdexcept>
#include <cstdlib>

#ifdef GENERATE_RANDOM_EXCEPTIONS
#define ARCTICDB_DEBUG_THROW(percentage) \
do { \
if (static_cast<double>(std::rand()) / RAND_MAX * 100 < percentage) { \
throw std::runtime_error("Exception intentionally thrown"); \
} \
} while(0);
#else
#define ARCTICDB_DEBUG_THROW(percentage)
#endif

11 changes: 11 additions & 0 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1012,8 +1012,19 @@
log::version().debug("Compacting incomplete symbol {}", stream_id);

auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id);
auto pipeline_context = std::make_shared<PipelineContext>();
pipeline_context->stream_id_ = stream_id;
pipeline_context->version_id_ = update_info.next_version_id_;


auto delete_keys_on_failure = get_delete_keys_on_failure(pipeline_context, store(), options);

auto versioned_item = compact_incomplete_impl(store_, stream_id, user_meta, update_info, options, get_write_options());

Check failure on line 1022 in cpp/arcticdb/version/local_versioned_engine.cpp

View workflow job for this annotation

GitHub Actions / code_coverage

too few arguments to function ‘arcticdb::VersionedItem arcticdb::version_store::compact_incomplete_impl(const std::shared_ptr<arcticdb::Store>&, const StreamId&, const std::optional<arcticc::pb2::descriptors_pb2::UserDefinedMetadata>&, const arcticdb::version_store::UpdateInfo&, const arcticdb::version_store::CompactIncompleteOptions&, const arcticdb::WriteOptions&, std::shared_ptr<arcticdb::pipelines::PipelineContext>&)’

Check failure on line 1022 in cpp/arcticdb/version/local_versioned_engine.cpp

View workflow job for this annotation

GitHub Actions / Linux C++ Tests / compile (linux, linux, manylinux_x86_64, /tmp/cpp_build, *.gz, *.so, *.[ao], vcpkg_installed, mon...

too few arguments to function ‘arcticdb::VersionedItem arcticdb::version_store::compact_incomplete_impl(const std::shared_ptr<arcticdb::Store>&, const StreamId&, const std::optional<arcticc::pb2::descriptors_pb2::UserDefinedMetadata>&, const arcticdb::version_store::UpdateInfo&, const arcticdb::version_store::CompactIncompleteOptions&, const arcticdb::WriteOptions&, std::shared_ptr<arcticdb::pipelines::PipelineContext>&)’

Check failure on line 1022 in cpp/arcticdb/version/local_versioned_engine.cpp

View workflow job for this annotation

GitHub Actions / Windows C++ Tests / compile (windows, windows-cl, win_amd64, C:/cpp_build, C:/vcpkg_packages, *.pdb, *.lib, *.ilk, *....

'arcticdb::version_store::compact_incomplete_impl': function does not take 6 arguments

delete_incomplete_keys(*pipeline_context, *store());
if(delete_keys_on_failure)
delete_keys_on_failure->release();

write_version_and_prune_previous(options.prune_previous_versions_, versioned_item.key_, update_info.previous_index_key_);
add_to_symbol_list_on_compaction(stream_id, options, update_info);

Expand Down
90 changes: 45 additions & 45 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1386,10 +1386,10 @@ VersionedItem collate_and_write(
});
}

void delete_incomplete_keys(PipelineContext* pipeline_context, Store* store) {
void delete_incomplete_keys(PipelineContext& pipeline_context, Store& store) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guessing PipelineContext can be const&

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A rational person would think so, but because of irritating reasons the function returning the iterator into the pipeline context can't be made easily const, and we want this class to just go away so I don't want to spend too much time (any time) polishing it.

std::vector<entity::VariantKey> keys_to_delete;
keys_to_delete.reserve(pipeline_context->slice_and_keys_.size() - pipeline_context->incompletes_after());
for (auto sk = pipeline_context->incompletes_begin(); sk != pipeline_context->end(); ++sk) {
keys_to_delete.reserve(pipeline_context.slice_and_keys_.size() - pipeline_context.incompletes_after());
for (auto sk = pipeline_context.incompletes_begin(); sk != pipeline_context.end(); ++sk) {
const auto& slice_and_key = sk->slice_and_key();
if (ARCTICDB_LIKELY(slice_and_key.key().type() == KeyType::APPEND_DATA)) {
keys_to_delete.emplace_back(slice_and_key.key());
Expand All @@ -1401,41 +1401,45 @@ void delete_incomplete_keys(PipelineContext* pipeline_context, Store* store) {
);
}
}
store->remove_keys(keys_to_delete).get();
store.remove_keys(keys_to_delete).get();
}

class IncompleteKeysRAII {
public:
IncompleteKeysRAII() = default;
IncompleteKeysRAII(
DeleteIncompleteKeysOnExit::DeleteIncompleteKeysOnExit(
std::shared_ptr<PipelineContext> pipeline_context,
std::shared_ptr<Store> store,
const CompactIncompleteOptions* options)
bool via_iteration)
: context_(pipeline_context),
store_(store),
options_(options) {
via_iteration_(via_iteration) {
}
ARCTICDB_MOVE_ONLY_DEFAULT(IncompleteKeysRAII)

~IncompleteKeysRAII() {
if (context_ && store_) {
if (context_->incompletes_after_) {
delete_incomplete_keys(context_.get(), store_.get());
} else {
// If an exception is thrown before read_incompletes_to_pipeline the keys won't be placed inside the
// context thus they must be read manually.
const std::vector<VariantKey> entries =
read_incomplete_keys_for_symbol(store_, context_->stream_id_, options_->via_iteration_);
store_->remove_keys(entries).get();
}

DeleteIncompleteKeysOnExit::~DeleteIncompleteKeysOnExit() {
if(released_)
return;

try {
if (context_->incompletes_after_) {
delete_incomplete_keys(*context_, *store_);
} else {
// If an exception is thrown before read_incompletes_to_pipeline the keys won't be placed inside the
// context thus they must be read manually.
auto entries = read_incomplete_keys_for_symbol(store_, context_->stream_id_, via_iteration_);
store_->remove_keys(entries).get();
}
} catch (...) {
// Don't emit exceptions from destructor
}
}

private:
std::shared_ptr<PipelineContext> context_ = nullptr;
std::shared_ptr<Store> store_ = nullptr;
const CompactIncompleteOptions* options_ = nullptr;
};
std::optional<DeleteIncompleteKeysOnExit> get_delete_keys_on_failure(
const std::shared_ptr<PipelineContext>& pipeline_context,
const std::shared_ptr<Store>& store,
const CompactIncompleteOptions& options) {
if(options.delete_staged_data_on_failure_)
return std::make_optional<DeleteIncompleteKeysOnExit>(pipeline_context, store, options.via_iteration_);
else
return std::nullopt;
}

VersionedItem sort_merge_impl(
const std::shared_ptr<Store>& store,
Expand All @@ -1450,9 +1454,7 @@ VersionedItem sort_merge_impl(
auto read_query = std::make_shared<ReadQuery>();

std::optional<SortedValue> previous_sorted_value;
const IncompleteKeysRAII incomplete_keys_raii = options.delete_staged_data_on_failure_
? IncompleteKeysRAII{pipeline_context, store, &options}
: IncompleteKeysRAII{};
auto delete_keys_on_failure = get_delete_keys_on_failure(pipeline_context, store, options);
if(options.append_ && update_info.previous_index_key_.has_value()) {
read_indexed_keys_to_pipeline(store, pipeline_context, *(update_info.previous_index_key_), *read_query, ReadOptions{});
if (!write_options.dynamic_schema) {
Expand Down Expand Up @@ -1553,9 +1555,11 @@ VersionedItem sort_merge_impl(
keys,
pipeline_context->incompletes_after(),
norm_meta);
if (!options.delete_staged_data_on_failure_) {
delete_incomplete_keys(pipeline_context.get(), store.get());
}

delete_incomplete_keys(*pipeline_context, *store);
if(delete_keys_on_failure)
delete_keys_on_failure->release();

return vit;
}

Expand All @@ -1565,20 +1569,15 @@ VersionedItem compact_incomplete_impl(
const std::optional<arcticdb::proto::descriptors::UserDefinedMetadata>& user_meta,
const UpdateInfo& update_info,
const CompactIncompleteOptions& options,
const WriteOptions& write_options) {
const WriteOptions& write_options,
std::shared_ptr<PipelineContext>& pipeline_context) {

auto pipeline_context = std::make_shared<PipelineContext>();
pipeline_context->stream_id_ = stream_id;
pipeline_context->version_id_ = update_info.next_version_id_;
ReadQuery read_query;
ReadOptions read_options;
read_options.set_dynamic_schema(true);

std::optional<SegmentInMemory> last_indexed;
std::optional<SortedValue> previous_sorted_value;
const IncompleteKeysRAII incomplete_keys_raii = options.delete_staged_data_on_failure_
? IncompleteKeysRAII{pipeline_context, store, &options}
: IncompleteKeysRAII{};

if(options.append_ && update_info.previous_index_key_.has_value()) {
read_indexed_keys_to_pipeline(store, pipeline_context, *(update_info.previous_index_key_), read_query, read_options);
if (!write_options.dynamic_schema) {
Expand Down Expand Up @@ -1648,9 +1647,10 @@ VersionedItem compact_incomplete_impl(
keys,
pipeline_context->incompletes_after(),
user_meta);
if (!options.delete_staged_data_on_failure_) {
delete_incomplete_keys(pipeline_context.get(), store.get());
}




return vit;
}

Expand Down
Loading
Loading