Skip to content

Commit

Permalink
Don't delete staged on failure to write version key
Browse files Browse the repository at this point in the history
  • Loading branch information
willdealtry committed Nov 13, 2024
1 parent d35e57c commit 1da3a6a
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 39 deletions.
11 changes: 11 additions & 0 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1012,8 +1012,19 @@ VersionedItem LocalVersionedEngine::compact_incomplete_dynamic(
log::version().debug("Compacting incomplete symbol {}", stream_id);

auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id);
auto pipeline_context = std::make_shared<PipelineContext>();
pipeline_context->stream_id_ = stream_id;
pipeline_context->version_id_ = update_info.next_version_id_;


auto delete_keys_on_failure = get_delete_keys_on_failure(pipeline_context, store(), options);

auto versioned_item = compact_incomplete_impl(store_, stream_id, user_meta, update_info, options, get_write_options());

Check failure on line 1022 in cpp/arcticdb/version/local_versioned_engine.cpp

View workflow job for this annotation

GitHub Actions / code_coverage

too few arguments to function ‘arcticdb::VersionedItem arcticdb::version_store::compact_incomplete_impl(const std::shared_ptr<arcticdb::Store>&, const StreamId&, const std::optional<arcticc::pb2::descriptors_pb2::UserDefinedMetadata>&, const arcticdb::version_store::UpdateInfo&, const arcticdb::version_store::CompactIncompleteOptions&, const arcticdb::WriteOptions&, std::shared_ptr<arcticdb::pipelines::PipelineContext>&)’

Check failure on line 1022 in cpp/arcticdb/version/local_versioned_engine.cpp

View workflow job for this annotation

GitHub Actions / Linux C++ Tests / compile (linux, linux, manylinux_x86_64, /tmp/cpp_build, *.gz, *.so, *.[ao], vcpkg_installed, mon...

too few arguments to function ‘arcticdb::VersionedItem arcticdb::version_store::compact_incomplete_impl(const std::shared_ptr<arcticdb::Store>&, const StreamId&, const std::optional<arcticc::pb2::descriptors_pb2::UserDefinedMetadata>&, const arcticdb::version_store::UpdateInfo&, const arcticdb::version_store::CompactIncompleteOptions&, const arcticdb::WriteOptions&, std::shared_ptr<arcticdb::pipelines::PipelineContext>&)’

Check failure on line 1022 in cpp/arcticdb/version/local_versioned_engine.cpp

View workflow job for this annotation

GitHub Actions / Windows C++ Tests / compile (windows, windows-cl, win_amd64, C:/cpp_build, C:/vcpkg_packages, *.pdb, *.lib, *.ilk, *....

'arcticdb::version_store::compact_incomplete_impl': function does not take 6 arguments

delete_incomplete_keys(*pipeline_context, *store());
if(delete_keys_on_failure)
delete_keys_on_failure->release();

write_version_and_prune_previous(options.prune_previous_versions_, versioned_item.key_, update_info.previous_index_key_);
add_to_symbol_list_on_compaction(stream_id, options, update_info);

Expand Down
57 changes: 19 additions & 38 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1404,46 +1404,32 @@ void delete_incomplete_keys(PipelineContext& pipeline_context, Store& store) {
store.remove_keys(keys_to_delete).get();
}

class DeleteIncompleteKeysOnExit {
public:
DeleteIncompleteKeysOnExit(
DeleteIncompleteKeysOnExit::DeleteIncompleteKeysOnExit(
std::shared_ptr<PipelineContext> pipeline_context,
std::shared_ptr<Store> store,
bool via_iteration)
: context_(pipeline_context),
store_(store),
via_iteration_(via_iteration) {
}
ARCTICDB_NO_MOVE_OR_COPY(DeleteIncompleteKeysOnExit)

~DeleteIncompleteKeysOnExit() {
if(released_)
return;
DeleteIncompleteKeysOnExit::~DeleteIncompleteKeysOnExit() {
if(released_)
return;

try {
if (context_->incompletes_after_) {
delete_incomplete_keys(*context_, *store_);
} else {
// If an exception is thrown before read_incompletes_to_pipeline the keys won't be placed inside the
// context thus they must be read manually.
auto entries = read_incomplete_keys_for_symbol(store_, context_->stream_id_, via_iteration_);
store_->remove_keys(entries).get();
}
} catch (...) {
// Don't emit exceptions from destructor
try {
if (context_->incompletes_after_) {
delete_incomplete_keys(*context_, *store_);
} else {
// If an exception is thrown before read_incompletes_to_pipeline the keys won't be placed inside the
// context thus they must be read manually.
auto entries = read_incomplete_keys_for_symbol(store_, context_->stream_id_, via_iteration_);
store_->remove_keys(entries).get();
}
} catch (...) {
// Don't emit exceptions from destructor
}

void release() {
released_ = true;
}

private:
std::shared_ptr<PipelineContext> context_;
std::shared_ptr<Store> store_;
bool via_iteration_;
bool released_ = false;
};
}

std::optional<DeleteIncompleteKeysOnExit> get_delete_keys_on_failure(
const std::shared_ptr<PipelineContext>& pipeline_context,
Expand Down Expand Up @@ -1583,18 +1569,15 @@ VersionedItem compact_incomplete_impl(
const std::optional<arcticdb::proto::descriptors::UserDefinedMetadata>& user_meta,
const UpdateInfo& update_info,
const CompactIncompleteOptions& options,
const WriteOptions& write_options) {
const WriteOptions& write_options,
std::shared_ptr<PipelineContext>& pipeline_context) {

auto pipeline_context = std::make_shared<PipelineContext>();
pipeline_context->stream_id_ = stream_id;
pipeline_context->version_id_ = update_info.next_version_id_;
ReadQuery read_query;
ReadOptions read_options;
read_options.set_dynamic_schema(true);

std::optional<SegmentInMemory> last_indexed;
std::optional<SortedValue> previous_sorted_value;
auto delete_keys_on_failure = get_delete_keys_on_failure(pipeline_context, store, options);

if(options.append_ && update_info.previous_index_key_.has_value()) {
read_indexed_keys_to_pipeline(store, pipeline_context, *(update_info.previous_index_key_), read_query, read_options);
if (!write_options.dynamic_schema) {
Expand Down Expand Up @@ -1666,9 +1649,7 @@ VersionedItem compact_incomplete_impl(
user_meta);


delete_incomplete_keys(*pipeline_context, *store);
if(delete_keys_on_failure)
delete_keys_on_failure->release();


return vit;
}
Expand Down
31 changes: 30 additions & 1 deletion cpp/arcticdb/version/version_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ VersionedItem compact_incomplete_impl(
const std::optional<arcticdb::proto::descriptors::UserDefinedMetadata>& user_meta,
const UpdateInfo& update_info,
const CompactIncompleteOptions& options,
const WriteOptions& write_options);
const WriteOptions& write_options,
std::shared_ptr<PipelineContext>& pipeline_context);

struct PredefragmentationInfo{
std::shared_ptr<PipelineContext> pipeline_context;
Expand Down Expand Up @@ -207,6 +208,34 @@ folly::Future<ReadVersionOutput> read_frame_for_version(
std::any& handler_data
);

class DeleteIncompleteKeysOnExit {
public:
DeleteIncompleteKeysOnExit(
std::shared_ptr<PipelineContext> pipeline_context,
std::shared_ptr<Store> store,
bool via_iteration);

ARCTICDB_NO_MOVE_OR_COPY(DeleteIncompleteKeysOnExit)

~DeleteIncompleteKeysOnExit();

void release() {
released_ = true;
}

private:
std::shared_ptr<PipelineContext> context_;
std::shared_ptr<Store> store_;
bool via_iteration_;
bool released_ = false;
};
void delete_incomplete_keys(PipelineContext& pipeline_context, Store& store);

std::optional<DeleteIncompleteKeysOnExit> get_delete_keys_on_failure(
const std::shared_ptr<PipelineContext>& pipeline_context,
const std::shared_ptr<Store>& store,
const CompactIncompleteOptions& options);

} //namespace arcticdb::version_store

#define ARCTICDB_VERSION_CORE_H_
Expand Down

0 comments on commit 1da3a6a

Please sign in to comment.