diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index da52561a14..cc6764fa5b 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -1012,8 +1012,19 @@ VersionedItem LocalVersionedEngine::compact_incomplete_dynamic( log::version().debug("Compacting incomplete symbol {}", stream_id); auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id); + auto pipeline_context = std::make_shared(); + pipeline_context->stream_id_ = stream_id; + pipeline_context->version_id_ = update_info.next_version_id_; + + + auto delete_keys_on_failure = get_delete_keys_on_failure(pipeline_context, store(), options); + auto versioned_item = compact_incomplete_impl(store_, stream_id, user_meta, update_info, options, get_write_options()); + delete_incomplete_keys(*pipeline_context, *store()); + if(delete_keys_on_failure) + delete_keys_on_failure->release(); + write_version_and_prune_previous(options.prune_previous_versions_, versioned_item.key_, update_info.previous_index_key_); add_to_symbol_list_on_compaction(stream_id, options, update_info); diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index d8c702621b..5c03aa2bec 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -1404,9 +1404,7 @@ void delete_incomplete_keys(PipelineContext& pipeline_context, Store& store) { store.remove_keys(keys_to_delete).get(); } -class DeleteIncompleteKeysOnExit { -public: - DeleteIncompleteKeysOnExit( +DeleteIncompleteKeysOnExit::DeleteIncompleteKeysOnExit( std::shared_ptr pipeline_context, std::shared_ptr store, bool via_iteration) @@ -1414,36 +1412,24 @@ class DeleteIncompleteKeysOnExit { store_(store), via_iteration_(via_iteration) { } - ARCTICDB_NO_MOVE_OR_COPY(DeleteIncompleteKeysOnExit) - ~DeleteIncompleteKeysOnExit() { - if(released_) - return; +DeleteIncompleteKeysOnExit::~DeleteIncompleteKeysOnExit() { + if(released_) + return; - try { - if (context_->incompletes_after_) { - delete_incomplete_keys(*context_, *store_); - } else { - // If an exception is thrown before read_incompletes_to_pipeline the keys won't be placed inside the - // context thus they must be read manually. - auto entries = read_incomplete_keys_for_symbol(store_, context_->stream_id_, via_iteration_); - store_->remove_keys(entries).get(); - } - } catch (...) { - // Don't emit exceptions from destructor + try { + if (context_->incompletes_after_) { + delete_incomplete_keys(*context_, *store_); + } else { + // If an exception is thrown before read_incompletes_to_pipeline the keys won't be placed inside the + // context thus they must be read manually. + auto entries = read_incomplete_keys_for_symbol(store_, context_->stream_id_, via_iteration_); + store_->remove_keys(entries).get(); } + } catch (...) { + // Don't emit exceptions from destructor } - - void release() { - released_ = true; - } - -private: - std::shared_ptr context_; - std::shared_ptr store_; - bool via_iteration_; - bool released_ = false; -}; +} std::optional get_delete_keys_on_failure( const std::shared_ptr& pipeline_context, @@ -1583,18 +1569,15 @@ VersionedItem compact_incomplete_impl( const std::optional& user_meta, const UpdateInfo& update_info, const CompactIncompleteOptions& options, - const WriteOptions& write_options) { + const WriteOptions& write_options, + std::shared_ptr& pipeline_context) { - auto pipeline_context = std::make_shared(); - pipeline_context->stream_id_ = stream_id; - pipeline_context->version_id_ = update_info.next_version_id_; ReadQuery read_query; ReadOptions read_options; read_options.set_dynamic_schema(true); - std::optional last_indexed; std::optional previous_sorted_value; - auto delete_keys_on_failure = get_delete_keys_on_failure(pipeline_context, store, options); + if(options.append_ && update_info.previous_index_key_.has_value()) { read_indexed_keys_to_pipeline(store, pipeline_context, *(update_info.previous_index_key_), read_query, read_options); if (!write_options.dynamic_schema) { @@ -1666,9 +1649,7 @@ VersionedItem compact_incomplete_impl( user_meta); - delete_incomplete_keys(*pipeline_context, *store); - if(delete_keys_on_failure) - delete_keys_on_failure->release(); + return vit; } diff --git a/cpp/arcticdb/version/version_core.hpp b/cpp/arcticdb/version/version_core.hpp index 7e04f29e7f..f2bdcd4091 100644 --- a/cpp/arcticdb/version/version_core.hpp +++ b/cpp/arcticdb/version/version_core.hpp @@ -150,7 +150,8 @@ VersionedItem compact_incomplete_impl( const std::optional& user_meta, const UpdateInfo& update_info, const CompactIncompleteOptions& options, - const WriteOptions& write_options); + const WriteOptions& write_options, + std::shared_ptr& pipeline_context); struct PredefragmentationInfo{ std::shared_ptr pipeline_context; @@ -207,6 +208,34 @@ folly::Future read_frame_for_version( std::any& handler_data ); +class DeleteIncompleteKeysOnExit { +public: + DeleteIncompleteKeysOnExit( + std::shared_ptr pipeline_context, + std::shared_ptr store, + bool via_iteration); + + ARCTICDB_NO_MOVE_OR_COPY(DeleteIncompleteKeysOnExit) + + ~DeleteIncompleteKeysOnExit(); + + void release() { + released_ = true; + } + +private: + std::shared_ptr context_; + std::shared_ptr store_; + bool via_iteration_; + bool released_ = false; +}; +void delete_incomplete_keys(PipelineContext& pipeline_context, Store& store); + +std::optional get_delete_keys_on_failure( + const std::shared_ptr& pipeline_context, + const std::shared_ptr& store, + const CompactIncompleteOptions& options); + } //namespace arcticdb::version_store #define ARCTICDB_VERSION_CORE_H_