From b748d8af5dc9b6a3a700fc296ba2c26ffb6beebe Mon Sep 17 00:00:00 2001 From: nico <109071306+NicoYuan1986@users.noreply.github.com> Date: Fri, 14 Jun 2024 17:51:57 +0800 Subject: [PATCH 01/10] test: add different datatype for some functions (#33869) Signed-off-by: nico --- tests/python_client/testcases/test_query.py | 35 ++++++++++++ tests/python_client/testcases/test_search.py | 60 ++++++++++++++++---- 2 files changed, 84 insertions(+), 11 deletions(-) diff --git a/tests/python_client/testcases/test_query.py b/tests/python_client/testcases/test_query.py index efad358ec2b78..d1e25351c56bf 100644 --- a/tests/python_client/testcases/test_query.py +++ b/tests/python_client/testcases/test_query.py @@ -2363,6 +2363,20 @@ def test_query_output_binary_vec_field_after_index(self): res, _ = collection_w.query(default_term_expr, output_fields=[ct.default_binary_vec_field_name]) assert res[0].keys() == set(fields) + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("vector_data_type", ["FLOAT_VECTOR", "FLOAT16_VECTOR", "BFLOAT16_VECTOR"]) + def test_query_output_all_vector_type(self, vector_data_type): + """ + target: test query output different vector type + method: create index and specify vec field as output field + expected: return primary field and vec field + """ + collection_w, vectors = self.init_collection_general(prefix, True, + vector_data_type=vector_data_type)[0:2] + fields = [ct.default_int64_field_name, ct.default_float_vec_field_name] + res, _ = collection_w.query(default_term_expr, output_fields=[ct.default_float_vec_field_name]) + assert res[0].keys() == set(fields) + @pytest.mark.tags(CaseLabel.L2) def test_query_partition_repeatedly(self): """ @@ -3741,6 +3755,27 @@ def test_query_iterator_with_offset(self, offset): check_items={"count": ct.default_nb - offset, "batch_size": batch_size}) + @pytest.mark.tags(CaseLabel.L2) + @pytest.mark.parametrize("vector_data_type", ["FLOAT_VECTOR", "FLOAT16_VECTOR", "BFLOAT16_VECTOR"]) + def test_query_iterator_output_different_vector_type(self, vector_data_type): + """ + target: test query iterator with output fields + method: 1. query iterator output different vector type + 2. check the result, expect pk + expected: query successfully + """ + # 1. initialize with data + batch_size = 400 + collection_w = self.init_collection_general(prefix, True, + vector_data_type=vector_data_type)[0] + # 2. query iterator + expr = "int64 >= 0" + collection_w.query_iterator(batch_size, expr=expr, + output_fields=[ct.default_float_vec_field_name], + check_task=CheckTasks.check_query_iterator, + check_items={"count": ct.default_nb, + "batch_size": batch_size}) + @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("batch_size", [10, 100, 777, 2000]) def test_query_iterator_with_different_batch_size(self, batch_size): diff --git a/tests/python_client/testcases/test_search.py b/tests/python_client/testcases/test_search.py index 829f3b7f86396..e749d9fa8be59 100644 --- a/tests/python_client/testcases/test_search.py +++ b/tests/python_client/testcases/test_search.py @@ -5074,6 +5074,8 @@ def test_search_cosine_results_same_as_ip(self): # 4. check the search results for i in range(default_nq): assert res_ip[i].ids == res_cosine[i].ids + log.info(res_cosine[i].distances) + log.info(res_ip[i].distances) @pytest.mark.tags(CaseLabel.L2) def test_search_without_connect(self): @@ -5874,6 +5876,10 @@ def _async(self, request): def enable_dynamic_field(self, request): yield request.param + @pytest.fixture(scope="function", params=["FLOAT_VECTOR", "FLOAT16_VECTOR", "BFLOAT16_VECTOR"]) + def vector_data_type(self, request): + yield request.param + """ ****************************************************************** # The following are valid base cases @@ -5897,10 +5903,8 @@ def test_search_with_pagination(self, offset, limit, _async): collection_w = self.init_collection_general(prefix, True, auto_id=auto_id, dim=default_dim, enable_dynamic_field=enable_dynamic_field)[0] # 2. search pagination with offset - search_param = {"metric_type": "COSINE", - "params": {"nprobe": 10}, "offset": offset} - vectors = [[random.random() for _ in range(default_dim)] - for _ in range(default_nq)] + search_param = {"metric_type": "COSINE", "params": {"nprobe": 10}, "offset": offset} + vectors = [[random.random() for _ in range(default_dim)] for _ in range(default_nq)] search_res = collection_w.search(vectors[:default_nq], default_search_field, search_param, limit, default_search_exp, _async=_async, @@ -5937,10 +5941,8 @@ def test_search_string_with_pagination(self, offset, _async): self.init_collection_general(prefix, True, auto_id=auto_id, dim=default_dim, enable_dynamic_field=enable_dynamic_field)[0:4] # 2. search - search_param = {"metric_type": "COSINE", - "params": {"nprobe": 10}, "offset": offset} - vectors = [[random.random() for _ in range(default_dim)] - for _ in range(default_nq)] + search_param = {"metric_type": "COSINE", "params": {"nprobe": 10}, "offset": offset} + vectors = [[random.random() for _ in range(default_dim)] for _ in range(default_nq)] output_fields = [default_string_field_name, default_float_field_name] search_res = collection_w.search(vectors[:default_nq], default_search_field, search_param, default_limit, @@ -5999,6 +6001,40 @@ def test_search_binary_with_pagination(self, offset): assert sorted(search_res[0].distances, key=numpy.float32) == sorted( res[0].distances[offset:], key=numpy.float32) + @pytest.mark.tags(CaseLabel.L1) + def test_search_all_vector_type_with_pagination(self, vector_data_type): + """ + target: test search with pagination using different vector datatype + method: 1. connect and create a collection + 2. search pagination with offset + 3. search with offset+limit + 4. compare with the search results whose corresponding ids should be the same + expected: search successfully and ids is correct + """ + # 1. create a collection + auto_id = False + enable_dynamic_field = True + offset = 100 + limit = 20 + collection_w = self.init_collection_general(prefix, True, auto_id=auto_id, dim=default_dim, + enable_dynamic_field=enable_dynamic_field, + vector_data_type=vector_data_type)[0] + # 2. search pagination with offset + search_param = {"metric_type": "COSINE", "params": {"nprobe": 10}, "offset": offset} + vectors = cf.gen_vectors_based_on_vector_type(default_nq, default_dim, vector_data_type) + search_res = collection_w.search(vectors[:default_nq], default_search_field, + search_param, limit, + default_search_exp, + check_task=CheckTasks.check_search_results, + check_items={"nq": default_nq, + "limit": limit})[0] + # 3. search with offset+limit + res = collection_w.search(vectors[:default_nq], default_search_field, default_search_params, + limit + offset, default_search_exp)[0] + res_distance = res[0].distances[offset:] + # assert sorted(search_res[0].distances, key=numpy.float32) == sorted(res_distance, key=numpy.float32) + assert set(search_res[0].ids) == set(res[0].ids[offset:]) + @pytest.mark.tags(CaseLabel.L2) @pytest.mark.parametrize("limit", [100, 3000, 10000]) def test_search_with_pagination_topK(self, limit, _async): @@ -9854,7 +9890,8 @@ class TestSearchIterator(TestcaseBase): """ Test case of search iterator """ @pytest.mark.tags(CaseLabel.L1) - def test_search_iterator_normal(self): + @pytest.mark.parametrize("vector_data_type", ["FLOAT_VECTOR", "FLOAT16_VECTOR", "BFLOAT16_VECTOR"]) + def test_search_iterator_normal(self, vector_data_type): """ target: test search iterator normal method: 1. search iterator @@ -9863,12 +9900,13 @@ def test_search_iterator_normal(self): """ # 1. initialize with data dim = 128 - collection_w = self.init_collection_general( - prefix, True, dim=dim, is_index=False)[0] + collection_w = self.init_collection_general(prefix, True, dim=dim, is_index=False, + vector_data_type=vector_data_type)[0] collection_w.create_index(field_name, {"metric_type": "L2"}) collection_w.load() # 2. search iterator search_params = {"metric_type": "L2"} + vectors = cf.gen_vectors_based_on_vector_type(1, dim, vector_data_type) batch_size = 200 collection_w.search_iterator(vectors[:1], field_name, search_params, batch_size, check_task=CheckTasks.check_search_iterator, From f67b6dc2b054bd5c5a057e73b108cf1c4889d0c8 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Fri, 14 Jun 2024 17:57:56 +0800 Subject: [PATCH 02/10] fix: DeleteData merge wrong data casuing data loss (#33820) See also: #33819 Signed-off-by: yangxuan --- internal/storage/data_codec.go | 4 ++-- internal/storage/data_codec_test.go | 7 ++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 50de1efb9d35c..59f2b89ad451e 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -941,8 +941,8 @@ func (data *DeleteData) AppendBatch(pks []PrimaryKey, tss []Timestamp) { } func (data *DeleteData) Merge(other *DeleteData) { - data.Pks = append(other.Pks, other.Pks...) - data.Tss = append(other.Tss, other.Tss...) + data.Pks = append(data.Pks, other.Pks...) + data.Tss = append(data.Tss, other.Tss...) data.RowCount += other.RowCount data.memSize += other.Size() diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index 1f383b5f2b11a..84820062d8056 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -878,9 +878,12 @@ func TestDeleteData(t *testing.T) { pks, err := GenInt64PrimaryKeys(1, 2, 3) require.NoError(t, err) + pks2, err := GenInt64PrimaryKeys(4, 5, 6) + require.NoError(t, err) + t.Run("merge", func(t *testing.T) { first := NewDeleteData(pks, []Timestamp{100, 101, 102}) - second := NewDeleteData(pks, []Timestamp{100, 101, 102}) + second := NewDeleteData(pks2, []Timestamp{103, 104, 105}) require.EqualValues(t, first.RowCount, second.RowCount) require.EqualValues(t, first.Size(), second.Size()) require.EqualValues(t, 3, first.RowCount) @@ -891,6 +894,8 @@ func TestDeleteData(t *testing.T) { assert.Equal(t, len(first.Tss), 6) assert.EqualValues(t, first.RowCount, 6) assert.EqualValues(t, first.Size(), 144) + assert.ElementsMatch(t, first.Pks, append(pks, pks2...)) + assert.ElementsMatch(t, first.Tss, []Timestamp{100, 101, 102, 103, 104, 105}) assert.NotNil(t, second) assert.EqualValues(t, 0, second.RowCount) From e422168f0958d7b5922fa7a84fec5588f01bde14 Mon Sep 17 00:00:00 2001 From: zhagnlu <1542303831@qq.com> Date: Sun, 16 Jun 2024 21:48:01 +0800 Subject: [PATCH 03/10] fix: readd timestamp index because segment timestamp not ordered (#33856) #33533 Signed-off-by: luzhang Co-authored-by: luzhang --- internal/core/src/segcore/InsertRecord.h | 4 ++ .../core/src/segcore/SegmentSealedImpl.cpp | 53 +++++++++++++++---- 2 files changed, 46 insertions(+), 11 deletions(-) diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index e4b208bfc24b4..7d62e303eeda8 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -594,6 +594,7 @@ struct InsertRecord { timestamps_.clear(); reserved = 0; ack_responder_.clear(); + timestamp_index_ = TimestampIndex(); pk2offset_->clear(); fields_data_.clear(); } @@ -610,6 +611,9 @@ struct InsertRecord { std::atomic reserved = 0; AckResponder ack_responder_; + // used for timestamps index of sealed segment + TimestampIndex timestamp_index_; + // pks to row offset std::unique_ptr pk2offset_; diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 19962018a62e9..b6cac4f9371e5 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -23,7 +23,6 @@ #include #include #include -#include #include "Utils.h" #include "Types.h" @@ -349,9 +348,19 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) { offset += row_count; } + TimestampIndex index; + auto min_slice_length = num_rows < 4096 ? 1 : 4096; + auto meta = GenerateFakeSlices( + timestamps.data(), num_rows, min_slice_length); + index.set_length_meta(std::move(meta)); + // todo ::opt to avoid copy timestamps from field data + index.build_with(timestamps.data(), num_rows); + + // use special index std::unique_lock lck(mutex_); AssertInfo(insert_record_.timestamps_.empty(), "already exists"); insert_record_.timestamps_.fill_chunk_data(field_data); + insert_record_.timestamp_index_ = std::move(index); AssertInfo(insert_record_.timestamps_.num_chunk() == 1, "num chunk not equal to 1 for sealed segment"); stats_.mem_size += sizeof(Timestamp) * data.row_count; @@ -1519,6 +1528,12 @@ SegmentSealedImpl::debug() const { void SegmentSealedImpl::LoadSegmentMeta( const proto::segcore::LoadSegmentMeta& segment_meta) { + std::unique_lock lck(mutex_); + std::vector slice_lengths; + for (auto& info : segment_meta.metas()) { + slice_lengths.push_back(info.row_count()); + } + insert_record_.timestamp_index_.set_length_meta(std::move(slice_lengths)); PanicInfo(NotImplemented, "unimplemented"); } @@ -1530,17 +1545,33 @@ SegmentSealedImpl::get_active_count(Timestamp ts) const { void SegmentSealedImpl::mask_with_timestamps(BitsetType& bitset_chunk, - Timestamp ts) const { - auto row_count = this->get_row_count(); - auto& ts_vec = this->insert_record_.timestamps_; - auto iter = std::upper_bound( - boost::make_counting_iterator(static_cast(0)), - boost::make_counting_iterator(row_count), - ts, - [&](Timestamp ts, int64_t index) { return ts < ts_vec[index]; }); - for (size_t i = *iter; i < row_count; ++i) { - bitset_chunk.set(i); + Timestamp timestamp) const { + // TODO change the + AssertInfo(insert_record_.timestamps_.num_chunk() == 1, + "num chunk not equal to 1 for sealed segment"); + const auto& timestamps_data = insert_record_.timestamps_.get_chunk(0); + AssertInfo(timestamps_data.size() == get_row_count(), + fmt::format("Timestamp size not equal to row count: {}, {}", + timestamps_data.size(), + get_row_count())); + auto range = insert_record_.timestamp_index_.get_active_range(timestamp); + + // range == (size_, size_) and size_ is this->timestamps_.size(). + // it means these data are all useful, we don't need to update bitset_chunk. + // It can be thought of as an OR operation with another bitmask that is all 0s, but it is not necessary to do so. + if (range.first == range.second && range.first == timestamps_data.size()) { + // just skip + return; + } + // range == (0, 0). it means these data can not be used, directly set bitset_chunk to all 1s. + // It can be thought of as an OR operation with another bitmask that is all 1s. + if (range.first == range.second && range.first == 0) { + bitset_chunk.set(); + return; } + auto mask = TimestampIndex::GenerateBitset( + timestamp, range, timestamps_data.data(), timestamps_data.size()); + bitset_chunk |= mask; } bool From d43ec4db0b9255dd44c9a9cff5498bc49a22be6b Mon Sep 17 00:00:00 2001 From: zhagnlu <1542303831@qq.com> Date: Sun, 16 Jun 2024 21:51:58 +0800 Subject: [PATCH 04/10] enhance: support array bitmap index (#33527) #32900 --------- Signed-off-by: luzhang Co-authored-by: luzhang --- internal/core/src/common/Types.h | 28 ++ internal/core/src/index/BitmapIndex.cpp | 83 +++-- internal/core/src/index/BitmapIndex.h | 26 +- internal/core/src/index/HybridScalarIndex.cpp | 207 ++++++----- internal/core/src/index/HybridScalarIndex.h | 25 +- internal/core/src/index/IndexFactory.cpp | 124 +++---- internal/core/src/index/IndexFactory.h | 45 +-- .../core/src/index/InvertedIndexTantivy.cpp | 5 +- internal/core/src/index/ScalarIndexSort.h | 11 - internal/core/src/index/StringIndexMarisa.h | 11 - internal/core/src/indexbuilder/index_c.cpp | 3 +- internal/core/src/storage/Types.h | 2 +- internal/core/unittest/CMakeLists.txt | 2 +- .../core/unittest/test_array_bitmap_index.cpp | 330 ++++++++++++++++++ internal/core/unittest/test_hybrid_index.cpp | 19 +- .../core/unittest/test_inverted_index.cpp | 5 +- internal/core/unittest/test_scalar_index.cpp | 56 +-- internal/core/unittest/test_string_index.cpp | 2 +- .../test_utils/indexbuilder_test_utils.h | 7 +- .../indexparamcheck/bitmap_checker_test.go | 2 +- .../indexparamcheck/bitmap_index_checker.go | 4 +- 21 files changed, 699 insertions(+), 298 deletions(-) create mode 100644 internal/core/unittest/test_array_bitmap_index.cpp diff --git a/internal/core/src/common/Types.h b/internal/core/src/common/Types.h index 4d145778287c5..6e69ddac177c0 100644 --- a/internal/core/src/common/Types.h +++ b/internal/core/src/common/Types.h @@ -258,6 +258,34 @@ IsBinaryDataType(DataType data_type) { return IsJsonDataType(data_type) || IsArrayDataType(data_type); } +inline bool +IsPrimitiveType(proto::schema::DataType type) { + switch (type) { + case proto::schema::DataType::Bool: + case proto::schema::DataType::Int8: + case proto::schema::DataType::Int16: + case proto::schema::DataType::Int32: + case proto::schema::DataType::Int64: + case proto::schema::DataType::Float: + case proto::schema::DataType::Double: + case proto::schema::DataType::String: + case proto::schema::DataType::VarChar: + return true; + default: + return false; + } +} + +inline bool +IsJsonType(proto::schema::DataType type) { + return type == proto::schema::DataType::JSON; +} + +inline bool +IsArrayType(proto::schema::DataType type) { + return type == proto::schema::DataType::Array; +} + inline bool IsBinaryVectorDataType(DataType data_type) { return data_type == DataType::VECTOR_BINARY; diff --git a/internal/core/src/index/BitmapIndex.cpp b/internal/core/src/index/BitmapIndex.cpp index 3e63763dd2b51..1cd3d6f9a3d67 100644 --- a/internal/core/src/index/BitmapIndex.cpp +++ b/internal/core/src/index/BitmapIndex.cpp @@ -33,7 +33,8 @@ namespace index { template BitmapIndex::BitmapIndex( const storage::FileManagerContext& file_manager_context) - : is_built_(false) { + : is_built_(false), + schema_(file_manager_context.fieldDataMeta.field_schema) { if (file_manager_context.Valid()) { file_manager_ = std::make_shared(file_manager_context); @@ -45,7 +46,9 @@ template BitmapIndex::BitmapIndex( const storage::FileManagerContext& file_manager_context, std::shared_ptr space) - : is_built_(false), data_(), space_(space) { + : is_built_(false), + schema_(file_manager_context.fieldDataMeta.field_schema), + space_(space) { if (file_manager_context.Valid()) { file_manager_ = std::make_shared( file_manager_context, space); @@ -67,27 +70,7 @@ BitmapIndex::Build(const Config& config) { auto field_datas = file_manager_->CacheRawDataToMemory(insert_files.value()); - int total_num_rows = 0; - for (const auto& field_data : field_datas) { - total_num_rows += field_data->get_num_rows(); - } - if (total_num_rows == 0) { - throw SegcoreError(DataIsEmpty, - "scalar bitmap index can not build null values"); - } - - total_num_rows_ = total_num_rows; - - int64_t offset = 0; - for (const auto& data : field_datas) { - auto slice_row_num = data->get_num_rows(); - for (size_t i = 0; i < slice_row_num; ++i) { - auto val = reinterpret_cast(data->RawValue(i)); - data_[*val].add(offset); - offset++; - } - } - is_built_ = true; + BuildWithFieldData(field_datas); } template @@ -144,6 +127,21 @@ BitmapIndex::BuildV2(const Config& config) { BuildWithFieldData(field_datas); } +template +void +BitmapIndex::BuildPrimitiveField( + const std::vector& field_datas) { + int64_t offset = 0; + for (const auto& data : field_datas) { + auto slice_row_num = data->get_num_rows(); + for (size_t i = 0; i < slice_row_num; ++i) { + auto val = reinterpret_cast(data->RawValue(i)); + data_[*val].add(offset); + offset++; + } + } +} + template void BitmapIndex::BuildWithFieldData( @@ -158,17 +156,46 @@ BitmapIndex::BuildWithFieldData( } total_num_rows_ = total_num_rows; + switch (schema_.data_type()) { + case proto::schema::DataType::Bool: + case proto::schema::DataType::Int8: + case proto::schema::DataType::Int16: + case proto::schema::DataType::Int32: + case proto::schema::DataType::Int64: + case proto::schema::DataType::Float: + case proto::schema::DataType::Double: + case proto::schema::DataType::String: + case proto::schema::DataType::VarChar: + BuildPrimitiveField(field_datas); + break; + case proto::schema::DataType::Array: + BuildArrayField(field_datas); + break; + default: + PanicInfo( + DataTypeInvalid, + fmt::format("Invalid data type: {} for build bitmap index", + proto::schema::DataType_Name(schema_.data_type()))); + } + is_built_ = true; +} + +template +void +BitmapIndex::BuildArrayField(const std::vector& field_datas) { int64_t offset = 0; for (const auto& data : field_datas) { auto slice_row_num = data->get_num_rows(); for (size_t i = 0; i < slice_row_num; ++i) { - auto val = reinterpret_cast(data->RawValue(i)); - data_[*val].add(offset); + auto array = + reinterpret_cast(data->RawValue(i)); + for (size_t j = 0; j < array->length(); ++j) { + auto val = array->template get_data(j); + data_[val].add(offset); + } offset++; } } - - is_built_ = true; } template @@ -877,4 +904,4 @@ template class BitmapIndex; template class BitmapIndex; } // namespace index -} // namespace milvus \ No newline at end of file +} // namespace milvus diff --git a/internal/core/src/index/BitmapIndex.h b/internal/core/src/index/BitmapIndex.h index 2ead42d5de545..6dca9c68740d7 100644 --- a/internal/core/src/index/BitmapIndex.h +++ b/internal/core/src/index/BitmapIndex.h @@ -50,17 +50,6 @@ class BitmapIndex : public ScalarIndex { const storage::FileManagerContext& file_manager_context, std::shared_ptr space); - explicit BitmapIndex( - const std::shared_ptr& file_manager) - : file_manager_(file_manager) { - } - - explicit BitmapIndex( - const std::shared_ptr& file_manager, - std::shared_ptr space) - : file_manager_(file_manager), space_(space) { - } - ~BitmapIndex() override = default; BinarySet @@ -117,6 +106,7 @@ class BitmapIndex : public ScalarIndex { BinarySet Upload(const Config& config = {}) override; + BinarySet UploadV2(const Config& config = {}) override; @@ -125,6 +115,11 @@ class BitmapIndex : public ScalarIndex { return true; } + void + LoadWithoutAssemble(const BinarySet& binary_set, + const Config& config) override; + + public: int64_t Cardinality() { if (build_mode_ == BitmapIndexBuildMode::ROARING) { @@ -134,11 +129,13 @@ class BitmapIndex : public ScalarIndex { } } + private: void - LoadWithoutAssemble(const BinarySet& binary_set, - const Config& config) override; + BuildPrimitiveField(const std::vector& datas); + + void + BuildArrayField(const std::vector& datas); - private: size_t GetIndexDataSize(); @@ -188,6 +185,7 @@ class BitmapIndex : public ScalarIndex { std::map data_; std::map bitsets_; size_t total_num_rows_{0}; + proto::schema::FieldSchema schema_; std::shared_ptr file_manager_; std::shared_ptr space_; }; diff --git a/internal/core/src/index/HybridScalarIndex.cpp b/internal/core/src/index/HybridScalarIndex.cpp index 518828ea7bac7..0f032d0501eb0 100644 --- a/internal/core/src/index/HybridScalarIndex.cpp +++ b/internal/core/src/index/HybridScalarIndex.cpp @@ -32,12 +32,14 @@ template HybridScalarIndex::HybridScalarIndex( const storage::FileManagerContext& file_manager_context) : is_built_(false), - bitmap_index_cardinality_limit_(DEFAULT_BITMAP_INDEX_CARDINALITY_BOUND) { + bitmap_index_cardinality_limit_(DEFAULT_BITMAP_INDEX_CARDINALITY_BOUND), + file_manager_context_(file_manager_context) { if (file_manager_context.Valid()) { - file_manager_ = + mem_file_manager_ = std::make_shared(file_manager_context); - AssertInfo(file_manager_ != nullptr, "create file manager failed!"); + AssertInfo(mem_file_manager_ != nullptr, "create file manager failed!"); } + field_type_ = file_manager_context.fieldDataMeta.field_schema.data_type(); internal_index_type_ = InternalIndexType::NONE; } @@ -47,12 +49,14 @@ HybridScalarIndex::HybridScalarIndex( std::shared_ptr space) : is_built_(false), bitmap_index_cardinality_limit_(DEFAULT_BITMAP_INDEX_CARDINALITY_BOUND), + file_manager_context_(file_manager_context), space_(space) { if (file_manager_context.Valid()) { - file_manager_ = std::make_shared( + mem_file_manager_ = std::make_shared( file_manager_context, space); - AssertInfo(file_manager_ != nullptr, "create file manager failed!"); + AssertInfo(mem_file_manager_ != nullptr, "create file manager failed!"); } + field_type_ = file_manager_context.fieldDataMeta.field_schema.data_type(); internal_index_type_ = InternalIndexType::NONE; } @@ -96,7 +100,7 @@ HybridScalarIndex::SelectIndexBuildType( template InternalIndexType -HybridScalarIndex::SelectIndexBuildType( +HybridScalarIndex::SelectBuildTypeForPrimitiveType( const std::vector& field_datas) { std::set distinct_vals; for (const auto& data : field_datas) { @@ -121,7 +125,7 @@ HybridScalarIndex::SelectIndexBuildType( template <> InternalIndexType -HybridScalarIndex::SelectIndexBuildType( +HybridScalarIndex::SelectBuildTypeForPrimitiveType( const std::vector& field_datas) { std::set distinct_vals; for (const auto& data : field_datas) { @@ -144,6 +148,52 @@ HybridScalarIndex::SelectIndexBuildType( return internal_index_type_; } +template +InternalIndexType +HybridScalarIndex::SelectBuildTypeForArrayType( + const std::vector& field_datas) { + std::set distinct_vals; + for (const auto& data : field_datas) { + auto slice_row_num = data->get_num_rows(); + for (size_t i = 0; i < slice_row_num; ++i) { + auto array = + reinterpret_cast(data->RawValue(i)); + for (size_t j = 0; j < array->length(); ++j) { + auto val = array->template get_data(j); + distinct_vals.insert(val); + + // Limit the bitmap index cardinality because of memory usage + if (distinct_vals.size() > bitmap_index_cardinality_limit_) { + break; + } + } + } + } + // Decide whether to select bitmap index or inverted index + if (distinct_vals.size() >= bitmap_index_cardinality_limit_) { + internal_index_type_ = InternalIndexType::INVERTED; + } else { + internal_index_type_ = InternalIndexType::BITMAP; + } + return internal_index_type_; +} + +template +InternalIndexType +HybridScalarIndex::SelectIndexBuildType( + const std::vector& field_datas) { + std::set distinct_vals; + if (IsPrimitiveType(field_type_)) { + return SelectBuildTypeForPrimitiveType(field_datas); + } else if (IsArrayType(field_type_)) { + return SelectBuildTypeForArrayType(field_datas); + } else { + PanicInfo(Unsupported, + fmt::format("unsupported build index for type {}", + DataType_Name(field_type_))); + } +} + template std::shared_ptr> HybridScalarIndex::GetInternalIndex() { @@ -151,9 +201,14 @@ HybridScalarIndex::GetInternalIndex() { return internal_index_; } if (internal_index_type_ == InternalIndexType::BITMAP) { - internal_index_ = std::make_shared>(file_manager_); + internal_index_ = + std::make_shared>(file_manager_context_); } else if (internal_index_type_ == InternalIndexType::STLSORT) { - internal_index_ = std::make_shared>(file_manager_); + internal_index_ = + std::make_shared>(file_manager_context_); + } else if (internal_index_type_ == InternalIndexType::INVERTED) { + internal_index_ = + std::make_shared>(file_manager_context_); } else { PanicInfo(UnexpectedError, "unknown index type when get internal index"); @@ -170,9 +225,13 @@ HybridScalarIndex::GetInternalIndex() { if (internal_index_type_ == InternalIndexType::BITMAP) { internal_index_ = - std::make_shared>(file_manager_); + std::make_shared>(file_manager_context_); } else if (internal_index_type_ == InternalIndexType::MARISA) { - internal_index_ = std::make_shared(file_manager_); + internal_index_ = + std::make_shared(file_manager_context_); + } else if (internal_index_type_ == InternalIndexType::INVERTED) { + internal_index_ = std::make_shared>( + file_manager_context_); } else { PanicInfo(UnexpectedError, "unknown index type when get internal index"); @@ -206,7 +265,7 @@ HybridScalarIndex::Build(const Config& config) { "insert file paths is empty when build index"); auto field_datas = - file_manager_->CacheRawDataToMemory(insert_files.value()); + mem_file_manager_->CacheRawDataToMemory(insert_files.value()); SelectIndexBuildType(field_datas); BuildInternal(field_datas); @@ -224,7 +283,7 @@ HybridScalarIndex::BuildV2(const Config& config) { LOG_INFO("config bitmap cardinality limit to {}", bitmap_index_cardinality_limit_); - auto field_name = file_manager_->GetIndexMeta().field_name; + auto field_name = mem_file_manager_->GetIndexMeta().field_name; auto reader = space_->ScanData(); std::vector field_datas; for (auto rec = reader->Next(); rec != nullptr; rec = reader->Next()) { @@ -262,32 +321,51 @@ HybridScalarIndex::Serialize(const Config& config) { template BinarySet -HybridScalarIndex::Upload(const Config& config) { - auto binary_set = Serialize(config); - file_manager_->AddFile(binary_set); +HybridScalarIndex::SerializeIndexType() { + // Add index type info to storage for future restruct index + BinarySet index_binary_set; + std::shared_ptr index_type_buf(new uint8_t[sizeof(uint8_t)]); + index_type_buf[0] = static_cast(internal_index_type_); + index_binary_set.Append(index::INDEX_TYPE, index_type_buf, sizeof(uint8_t)); + mem_file_manager_->AddFile(index_binary_set); - auto remote_paths_to_size = file_manager_->GetRemotePathsToFileSize(); - BinarySet ret; + auto remote_paths_to_size = mem_file_manager_->GetRemotePathsToFileSize(); + BinarySet ret_set; + Assert(remote_paths_to_size.size() == 1); for (auto& file : remote_paths_to_size) { - ret.Append(file.first, nullptr, file.second); + ret_set.Append(file.first, nullptr, file.second); } + return ret_set; +} - return ret; +template +BinarySet +HybridScalarIndex::Upload(const Config& config) { + auto internal_index = GetInternalIndex(); + auto index_ret = internal_index->Upload(config); + + auto index_type_ret = SerializeIndexType(); + + for (auto& [key, value] : index_type_ret.binary_map_) { + index_ret.Append(key, value); + } + + return index_ret; } template BinarySet HybridScalarIndex::UploadV2(const Config& config) { - auto binary_set = Serialize(config); - file_manager_->AddFileV2(binary_set); + auto internal_index = GetInternalIndex(); + auto index_ret = internal_index->Upload(config); - auto remote_paths_to_size = file_manager_->GetRemotePathsToFileSize(); - BinarySet ret; - for (auto& file : remote_paths_to_size) { - ret.Append(file.first, nullptr, file.second); + auto index_type_ret = SerializeIndexType(); + + for (auto& [key, value] : index_type_ret.binary_map_) { + index_ret.Append(key, value); } - return ret; + return index_ret; } template @@ -301,64 +379,32 @@ HybridScalarIndex::DeserializeIndexType(const BinarySet& binary_set) { template void -HybridScalarIndex::LoadInternal(const BinarySet& binary_set, - const Config& config) { - auto index = GetInternalIndex(); - index->LoadWithoutAssemble(binary_set, config); +HybridScalarIndex::LoadV2(const Config& config) { + PanicInfo(Unsupported, "HybridScalarIndex LoadV2 not implemented"); } template -void -HybridScalarIndex::Load(const BinarySet& binary_set, const Config& config) { - milvus::Assemble(const_cast(binary_set)); - DeserializeIndexType(binary_set); - - LoadInternal(binary_set, config); - is_built_ = true; +std::string +HybridScalarIndex::GetRemoteIndexTypeFile( + const std::vector& files) { + std::string ret; + for (auto& file : files) { + auto file_name = file.substr(file.find_last_of('/') + 1); + if (file_name == index::INDEX_TYPE) { + ret = file; + } + } + AssertInfo(!ret.empty(), "index type file not found for hybrid index"); + return ret; } template void -HybridScalarIndex::LoadV2(const Config& config) { - auto blobs = space_->StatisticsBlobs(); - std::vector index_files; - auto prefix = file_manager_->GetRemoteIndexObjectPrefixV2(); - for (auto& b : blobs) { - if (b.name.rfind(prefix, 0) == 0) { - index_files.push_back(b.name); - } - } - std::map index_datas{}; - for (auto& file_name : index_files) { - auto res = space_->GetBlobByteSize(file_name); - if (!res.ok()) { - PanicInfo(S3Error, "unable to read index blob"); - } - auto index_blob_data = - std::shared_ptr(new uint8_t[res.value()]); - auto status = space_->ReadBlob(file_name, index_blob_data.get()); - if (!status.ok()) { - PanicInfo(S3Error, "unable to read index blob"); - } - auto raw_index_blob = - storage::DeserializeFileData(index_blob_data, res.value()); - auto key = file_name.substr(file_name.find_last_of('/') + 1); - index_datas[key] = raw_index_blob->GetFieldData(); - } - AssembleIndexDatas(index_datas); - - BinarySet binary_set; - for (auto& [key, data] : index_datas) { - auto size = data->Size(); - auto deleter = [&](uint8_t*) {}; // avoid repeated deconstruction - auto buf = std::shared_ptr( - (uint8_t*)const_cast(data->Data()), deleter); - binary_set.Append(key, buf, size); - } - +HybridScalarIndex::Load(const BinarySet& binary_set, const Config& config) { DeserializeIndexType(binary_set); - LoadInternal(binary_set, config); + auto index = GetInternalIndex(); + index->Load(binary_set, config); is_built_ = true; } @@ -371,7 +417,11 @@ HybridScalarIndex::Load(milvus::tracer::TraceContext ctx, GetValueFromConfig>(config, "index_files"); AssertInfo(index_files.has_value(), "index file paths is empty when load bitmap index"); - auto index_datas = file_manager_->LoadIndexToMemory(index_files.value()); + + auto index_type_file = GetRemoteIndexTypeFile(index_files.value()); + + auto index_datas = mem_file_manager_->LoadIndexToMemory( + std::vector{index_type_file}); AssembleIndexDatas(index_datas); BinarySet binary_set; for (auto& [key, data] : index_datas) { @@ -384,7 +434,8 @@ HybridScalarIndex::Load(milvus::tracer::TraceContext ctx, DeserializeIndexType(binary_set); - LoadInternal(binary_set, config); + auto index = GetInternalIndex(); + index->Load(ctx, config); is_built_ = true; } diff --git a/internal/core/src/index/HybridScalarIndex.h b/internal/core/src/index/HybridScalarIndex.h index c3c44630bf846..2683eeff9d579 100644 --- a/internal/core/src/index/HybridScalarIndex.h +++ b/internal/core/src/index/HybridScalarIndex.h @@ -24,6 +24,7 @@ #include "index/BitmapIndex.h" #include "index/ScalarIndexSort.h" #include "index/StringIndexMarisa.h" +#include "index/InvertedIndexTantivy.h" #include "storage/FileManager.h" #include "storage/DiskFileManagerImpl.h" #include "storage/MemFileManagerImpl.h" @@ -37,6 +38,7 @@ enum class InternalIndexType { BITMAP, STLSORT, MARISA, + INVERTED, }; /* @@ -125,6 +127,9 @@ class HybridScalarIndex : public ScalarIndex { const bool HasRawData() const override { + if (field_type_ == proto::schema::DataType::Array) { + return false; + } return internal_index_->HasRawData(); } @@ -135,30 +140,42 @@ class HybridScalarIndex : public ScalarIndex { UploadV2(const Config& config = {}) override; private: + InternalIndexType + SelectBuildTypeForPrimitiveType( + const std::vector& field_datas); + + InternalIndexType + SelectBuildTypeForArrayType(const std::vector& field_datas); + InternalIndexType SelectIndexBuildType(const std::vector& field_datas); InternalIndexType SelectIndexBuildType(size_t n, const T* values); + BinarySet + SerializeIndexType(); + void DeserializeIndexType(const BinarySet& binary_set); void BuildInternal(const std::vector& field_datas); - void - LoadInternal(const BinarySet& binary_set, const Config& config); - std::shared_ptr> GetInternalIndex(); + std::string + GetRemoteIndexTypeFile(const std::vector& files); + public: bool is_built_{false}; int32_t bitmap_index_cardinality_limit_; + proto::schema::DataType field_type_; InternalIndexType internal_index_type_; std::shared_ptr> internal_index_{nullptr}; - std::shared_ptr file_manager_{nullptr}; + storage::FileManagerContext file_manager_context_; + std::shared_ptr mem_file_manager_{nullptr}; std::shared_ptr space_{nullptr}; }; diff --git a/internal/core/src/index/IndexFactory.cpp b/internal/core/src/index/IndexFactory.cpp index fdfdda7226ba5..ac4a89f933299 100644 --- a/internal/core/src/index/IndexFactory.cpp +++ b/internal/core/src/index/IndexFactory.cpp @@ -33,7 +33,7 @@ namespace milvus::index { template ScalarIndexPtr -IndexFactory::CreateScalarIndex( +IndexFactory::CreatePrimitiveScalarIndex( const IndexType& index_type, const storage::FileManagerContext& file_manager_context) { if (index_type == INVERTED_INDEX_TYPE) { @@ -54,7 +54,7 @@ IndexFactory::CreateScalarIndex( template <> ScalarIndexPtr -IndexFactory::CreateScalarIndex( +IndexFactory::CreatePrimitiveScalarIndex( const IndexType& index_type, const storage::FileManagerContext& file_manager_context) { #if defined(__linux__) || defined(__APPLE__) @@ -74,7 +74,7 @@ IndexFactory::CreateScalarIndex( template ScalarIndexPtr -IndexFactory::CreateScalarIndex( +IndexFactory::CreatePrimitiveScalarIndex( const IndexType& index_type, const storage::FileManagerContext& file_manager_context, std::shared_ptr space) { @@ -91,7 +91,7 @@ IndexFactory::CreateScalarIndex( template <> ScalarIndexPtr -IndexFactory::CreateScalarIndex( +IndexFactory::CreatePrimitiveScalarIndex( const IndexType& index_type, const storage::FileManagerContext& file_manager_context, std::shared_ptr space) { @@ -142,25 +142,32 @@ IndexFactory::CreatePrimitiveScalarIndex( switch (data_type) { // create scalar index case DataType::BOOL: - return CreateScalarIndex(index_type, file_manager_context); + return CreatePrimitiveScalarIndex(index_type, + file_manager_context); case DataType::INT8: - return CreateScalarIndex(index_type, file_manager_context); + return CreatePrimitiveScalarIndex(index_type, + file_manager_context); case DataType::INT16: - return CreateScalarIndex(index_type, file_manager_context); + return CreatePrimitiveScalarIndex(index_type, + file_manager_context); case DataType::INT32: - return CreateScalarIndex(index_type, file_manager_context); + return CreatePrimitiveScalarIndex(index_type, + file_manager_context); case DataType::INT64: - return CreateScalarIndex(index_type, file_manager_context); + return CreatePrimitiveScalarIndex(index_type, + file_manager_context); case DataType::FLOAT: - return CreateScalarIndex(index_type, file_manager_context); + return CreatePrimitiveScalarIndex(index_type, + file_manager_context); case DataType::DOUBLE: - return CreateScalarIndex(index_type, file_manager_context); + return CreatePrimitiveScalarIndex(index_type, + file_manager_context); // create string index case DataType::STRING: case DataType::VARCHAR: - return CreateScalarIndex(index_type, - file_manager_context); + return CreatePrimitiveScalarIndex( + index_type, file_manager_context); default: throw SegcoreError( DataTypeInvalid, @@ -168,21 +175,57 @@ IndexFactory::CreatePrimitiveScalarIndex( } } +IndexBasePtr +IndexFactory::CreateCompositeScalarIndex( + IndexType index_type, + const storage::FileManagerContext& file_manager_context) { + if (index_type == BITMAP_INDEX_TYPE) { + auto element_type = static_cast( + file_manager_context.fieldDataMeta.field_schema.element_type()); + return CreatePrimitiveScalarIndex( + element_type, index_type, file_manager_context); + } else if (index_type == INVERTED_INDEX_TYPE) { + auto element_type = static_cast( + file_manager_context.fieldDataMeta.field_schema.element_type()); + return CreatePrimitiveScalarIndex( + element_type, index_type, file_manager_context); + } +} + +IndexBasePtr +IndexFactory::CreateComplexScalarIndex( + IndexType index_type, + const storage::FileManagerContext& file_manager_context) { + PanicInfo(Unsupported, "Complex index not supported now"); +} + IndexBasePtr IndexFactory::CreateScalarIndex( const CreateIndexInfo& create_index_info, const storage::FileManagerContext& file_manager_context) { - switch (create_index_info.field_type) { - case DataType::ARRAY: + auto data_type = create_index_info.field_type; + switch (data_type) { + case DataType::BOOL: + case DataType::INT8: + case DataType::INT16: + case DataType::INT32: + case DataType::INT64: + case DataType::FLOAT: + case DataType::DOUBLE: + case DataType::VARCHAR: + case DataType::STRING: return CreatePrimitiveScalarIndex( - static_cast( - file_manager_context.fieldDataMeta.schema.element_type()), - create_index_info.index_type, - file_manager_context); - default: - return CreatePrimitiveScalarIndex(create_index_info.field_type, - create_index_info.index_type, + data_type, create_index_info.index_type, file_manager_context); + case DataType::ARRAY: { + return CreateCompositeScalarIndex(create_index_info.index_type, file_manager_context); + } + case DataType::JSON: { + return CreateComplexScalarIndex(create_index_info.index_type, + file_manager_context); + } + default: + PanicInfo(DataTypeInvalid, "Invalid data type:{}", data_type); } } @@ -251,43 +294,6 @@ IndexFactory::CreateVectorIndex( } } -IndexBasePtr -IndexFactory::CreateScalarIndex(const CreateIndexInfo& create_index_info, - const storage::FileManagerContext& file_manager, - std::shared_ptr space) { - auto data_type = create_index_info.field_type; - auto index_type = create_index_info.index_type; - - switch (data_type) { - // create scalar index - case DataType::BOOL: - return CreateScalarIndex(index_type, file_manager, space); - case DataType::INT8: - return CreateScalarIndex(index_type, file_manager, space); - case DataType::INT16: - return CreateScalarIndex(index_type, file_manager, space); - case DataType::INT32: - return CreateScalarIndex(index_type, file_manager, space); - case DataType::INT64: - return CreateScalarIndex(index_type, file_manager, space); - case DataType::FLOAT: - return CreateScalarIndex(index_type, file_manager, space); - case DataType::DOUBLE: - return CreateScalarIndex(index_type, file_manager, space); - - // create string index - case DataType::STRING: - case DataType::VARCHAR: - return CreateScalarIndex( - index_type, file_manager, space); - default: - throw SegcoreError( - DataTypeInvalid, - fmt::format("invalid data type to build mem index: {}", - data_type)); - } -} - IndexBasePtr IndexFactory::CreateVectorIndex( const CreateIndexInfo& create_index_info, diff --git a/internal/core/src/index/IndexFactory.h b/internal/core/src/index/IndexFactory.h index 47b255ab4e912..61c5119d4ca18 100644 --- a/internal/core/src/index/IndexFactory.h +++ b/internal/core/src/index/IndexFactory.h @@ -65,6 +65,7 @@ class IndexFactory { CreateVectorIndex(const CreateIndexInfo& create_index_info, const storage::FileManagerContext& file_manager_context); + // For base types like int, float, double, string, etc IndexBasePtr CreatePrimitiveScalarIndex( DataType data_type, @@ -72,6 +73,20 @@ class IndexFactory { const storage::FileManagerContext& file_manager_context = storage::FileManagerContext()); + // For types like array, struct, union, etc + IndexBasePtr + CreateCompositeScalarIndex( + IndexType index_type, + const storage::FileManagerContext& file_manager_context = + storage::FileManagerContext()); + + // For types like Json, XML, etc + IndexBasePtr + CreateComplexScalarIndex( + IndexType index_type, + const storage::FileManagerContext& file_manager_context = + storage::FileManagerContext()); + IndexBasePtr CreateScalarIndex(const CreateIndexInfo& create_index_info, const storage::FileManagerContext& file_manager_context = @@ -85,7 +100,10 @@ class IndexFactory { IndexBasePtr CreateScalarIndex(const CreateIndexInfo& create_index_info, const storage::FileManagerContext& file_manager_context, - std::shared_ptr space); + std::shared_ptr space) { + PanicInfo(ErrorCode::Unsupported, + "CreateScalarIndexV2 not implemented"); + } // IndexBasePtr // CreateIndex(DataType dtype, const IndexType& index_type); @@ -94,28 +112,15 @@ class IndexFactory { template ScalarIndexPtr - CreateScalarIndex(const IndexType& index_type, - const storage::FileManagerContext& file_manager = - storage::FileManagerContext()); + CreatePrimitiveScalarIndex(const IndexType& index_type, + const storage::FileManagerContext& file_manager = + storage::FileManagerContext()); template ScalarIndexPtr - CreateScalarIndex(const IndexType& index_type, - const storage::FileManagerContext& file_manager, - std::shared_ptr space); + CreatePrimitiveScalarIndex(const IndexType& index_type, + const storage::FileManagerContext& file_manager, + std::shared_ptr space); }; -// template <> -// ScalarIndexPtr -// IndexFactory::CreateScalarIndex( -// const IndexType& index_type, -// const storage::FileManagerContext& file_manager_context, -// DataType d_type); - -template <> -ScalarIndexPtr -IndexFactory::CreateScalarIndex( - const IndexType& index_type, - const storage::FileManagerContext& file_manager_context, - std::shared_ptr space); } // namespace milvus::index diff --git a/internal/core/src/index/InvertedIndexTantivy.cpp b/internal/core/src/index/InvertedIndexTantivy.cpp index 3b9a54fae940b..984dc19466b10 100644 --- a/internal/core/src/index/InvertedIndexTantivy.cpp +++ b/internal/core/src/index/InvertedIndexTantivy.cpp @@ -66,7 +66,7 @@ template InvertedIndexTantivy::InvertedIndexTantivy( const storage::FileManagerContext& ctx, std::shared_ptr space) - : space_(space), schema_(ctx.fieldDataMeta.schema) { + : space_(space), schema_(ctx.fieldDataMeta.field_schema) { mem_file_manager_ = std::make_shared(ctx, ctx.space_); disk_file_manager_ = std::make_shared(ctx, ctx.space_); auto field = @@ -259,8 +259,7 @@ InvertedIndexTantivy::InApplyCallback( template const TargetBitmap InvertedIndexTantivy::NotIn(size_t n, const T* values) { - TargetBitmap bitset(Count()); - bitset.set(); + TargetBitmap bitset(Count(), true); for (size_t i = 0; i < n; ++i) { auto array = wrapper_->term_query(values[i]); apply_hits(bitset, array, false); diff --git a/internal/core/src/index/ScalarIndexSort.h b/internal/core/src/index/ScalarIndexSort.h index 96402017c9cfe..ca44045c93ea9 100644 --- a/internal/core/src/index/ScalarIndexSort.h +++ b/internal/core/src/index/ScalarIndexSort.h @@ -41,17 +41,6 @@ class ScalarIndexSort : public ScalarIndex { const storage::FileManagerContext& file_manager_context, std::shared_ptr space); - explicit ScalarIndexSort( - const std::shared_ptr& file_manager) - : file_manager_(file_manager) { - } - - explicit ScalarIndexSort( - const std::shared_ptr& file_manager, - std::shared_ptr space) - : file_manager_(file_manager), space_(space) { - } - BinarySet Serialize(const Config& config) override; diff --git a/internal/core/src/index/StringIndexMarisa.h b/internal/core/src/index/StringIndexMarisa.h index e787a7e63b404..214635280e8c4 100644 --- a/internal/core/src/index/StringIndexMarisa.h +++ b/internal/core/src/index/StringIndexMarisa.h @@ -37,17 +37,6 @@ class StringIndexMarisa : public StringIndex { const storage::FileManagerContext& file_manager_context, std::shared_ptr space); - explicit StringIndexMarisa( - const std::shared_ptr& file_manager) - : file_manager_(file_manager) { - } - - explicit StringIndexMarisa( - const std::shared_ptr& file_manager, - std::shared_ptr space) - : file_manager_(file_manager), space_(space) { - } - int64_t Size() override; diff --git a/internal/core/src/indexbuilder/index_c.cpp b/internal/core/src/indexbuilder/index_c.cpp index 7ccaf7c414a24..84f781e589cca 100644 --- a/internal/core/src/indexbuilder/index_c.cpp +++ b/internal/core/src/indexbuilder/index_c.cpp @@ -274,7 +274,8 @@ CreateIndexV2(CIndex* res_index, build_index_info->collectionid(), build_index_info->partitionid(), build_index_info->segmentid(), - build_index_info->field_schema().fieldid()}; + build_index_info->field_schema().fieldid(), + build_index_info->field_schema()}; milvus::storage::IndexMeta index_meta{ build_index_info->segmentid(), build_index_info->field_schema().fieldid(), diff --git a/internal/core/src/storage/Types.h b/internal/core/src/storage/Types.h index fbd72d0a59a78..928386d19078a 100644 --- a/internal/core/src/storage/Types.h +++ b/internal/core/src/storage/Types.h @@ -64,7 +64,7 @@ struct FieldDataMeta { int64_t partition_id; int64_t segment_id; int64_t field_id; - proto::schema::FieldSchema schema; + proto::schema::FieldSchema field_schema; }; enum CodecType { diff --git a/internal/core/unittest/CMakeLists.txt b/internal/core/unittest/CMakeLists.txt index 62c68e6dcb33c..43da214d5c965 100644 --- a/internal/core/unittest/CMakeLists.txt +++ b/internal/core/unittest/CMakeLists.txt @@ -20,7 +20,6 @@ set(MILVUS_TEST_FILES test_bf.cpp test_bf_sparse.cpp test_binary.cpp - test_bitmap.cpp test_bool_index.cpp test_common.cpp test_concurrent_vector.cpp @@ -33,6 +32,7 @@ set(MILVUS_TEST_FILES test_growing_index.cpp test_indexing.cpp test_hybrid_index.cpp + test_array_bitmap_index.cpp test_index_c_api.cpp test_index_wrapper.cpp test_init.cpp diff --git a/internal/core/unittest/test_array_bitmap_index.cpp b/internal/core/unittest/test_array_bitmap_index.cpp new file mode 100644 index 0000000000000..6f86c30e7e948 --- /dev/null +++ b/internal/core/unittest/test_array_bitmap_index.cpp @@ -0,0 +1,330 @@ +// Copyright(C) 2019 - 2020 Zilliz.All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include +#include +#include +#include + +#include "common/Tracer.h" +#include "index/BitmapIndex.h" +#include "storage/Util.h" +#include "storage/InsertData.h" +#include "indexbuilder/IndexFactory.h" +#include "index/IndexFactory.h" +#include "test_utils/indexbuilder_test_utils.h" +#include "index/Meta.h" +#include "pb/schema.pb.h" + +using namespace milvus::index; +using namespace milvus::indexbuilder; +using namespace milvus; +using namespace milvus::index; + +template +static std::vector +GenerateData(const size_t size, const size_t cardinality) { + std::vector result; + for (size_t i = 0; i < size; ++i) { + result.push_back(rand() % cardinality); + } + return result; +} + +template <> +std::vector +GenerateData(const size_t size, const size_t cardinality) { + std::vector result; + for (size_t i = 0; i < size; ++i) { + result.push_back(rand() % 2 == 0); + } + return result; +} + +template <> +std::vector +GenerateData(const size_t size, const size_t cardinality) { + std::vector result; + for (size_t i = 0; i < size; ++i) { + result.push_back(std::to_string(rand() % cardinality)); + } + return result; +} + +std::vector +GenerateArrayData(proto::schema::DataType element_type, + int cardinality, + int size, + int array_len) { + std::vector data(size); + switch (element_type) { + case proto::schema::DataType::Bool: { + for (int i = 0; i < size; i++) { + milvus::proto::schema::ScalarField field_data; + for (int j = 0; j < array_len; j++) { + field_data.mutable_bool_data()->add_data( + static_cast(random())); + } + data[i] = field_data; + } + break; + } + case proto::schema::DataType::Int8: + case proto::schema::DataType::Int16: + case proto::schema::DataType::Int32: { + for (int i = 0; i < size; i++) { + milvus::proto::schema::ScalarField field_data; + + for (int j = 0; j < array_len; j++) { + field_data.mutable_int_data()->add_data( + static_cast(random() % cardinality)); + } + data[i] = field_data; + } + break; + } + case proto::schema::DataType::Int64: { + for (int i = 0; i < size; i++) { + milvus::proto::schema::ScalarField field_data; + for (int j = 0; j < array_len; j++) { + field_data.mutable_long_data()->add_data( + static_cast(random() % cardinality)); + } + data[i] = field_data; + } + break; + } + case proto::schema::DataType::String: { + for (int i = 0; i < size; i++) { + milvus::proto::schema::ScalarField field_data; + + for (int j = 0; j < array_len; j++) { + field_data.mutable_string_data()->add_data( + std::to_string(random() % cardinality)); + } + data[i] = field_data; + } + break; + } + case proto::schema::DataType::Float: { + for (int i = 0; i < size; i++) { + milvus::proto::schema::ScalarField field_data; + + for (int j = 0; j < array_len; j++) { + field_data.mutable_float_data()->add_data( + static_cast(random() % cardinality)); + } + data[i] = field_data; + } + break; + } + case proto::schema::DataType::Double: { + for (int i = 0; i < size; i++) { + milvus::proto::schema::ScalarField field_data; + + for (int j = 0; j < array_len; j++) { + field_data.mutable_double_data()->add_data( + static_cast(random() % cardinality)); + } + data[i] = field_data; + } + break; + } + default: { + throw std::runtime_error("unsupported data type"); + } + } + std::vector res; + for (int i = 0; i < size; i++) { + res.push_back(milvus::Array(data[i])); + } + return res; +} + +template +class ArrayBitmapIndexTest : public testing::Test { + protected: + void + Init(int64_t collection_id, + int64_t partition_id, + int64_t segment_id, + int64_t field_id, + int64_t index_build_id, + int64_t index_version) { + proto::schema::FieldSchema field_schema; + field_schema.set_data_type(proto::schema::DataType::Array); + proto::schema::DataType element_type; + if constexpr (std::is_same_v) { + element_type = proto::schema::DataType::Int8; + } else if constexpr (std::is_same_v) { + element_type = proto::schema::DataType::Int16; + } else if constexpr (std::is_same_v) { + element_type = proto::schema::DataType::Int32; + } else if constexpr (std::is_same_v) { + element_type = proto::schema::DataType::Int64; + } else if constexpr (std::is_same_v) { + element_type = proto::schema::DataType::Float; + } else if constexpr (std::is_same_v) { + element_type = proto::schema::DataType::Double; + } else if constexpr (std::is_same_v) { + element_type = proto::schema::DataType::String; + } + field_schema.set_element_type(element_type); + auto field_meta = storage::FieldDataMeta{ + collection_id, partition_id, segment_id, field_id, field_schema}; + auto index_meta = storage::IndexMeta{ + segment_id, field_id, index_build_id, index_version}; + + data_ = GenerateArrayData(element_type, cardinality_, nb_, 10); + + auto field_data = storage::CreateFieldData(DataType::ARRAY); + field_data->FillFieldData(data_.data(), data_.size()); + storage::InsertData insert_data(field_data); + insert_data.SetFieldDataMeta(field_meta); + insert_data.SetTimestamps(0, 100); + + auto serialized_bytes = insert_data.Serialize(storage::Remote); + + auto log_path = fmt::format("{}/{}/{}/{}/{}/{}", + "test_array_bitmap", + collection_id, + partition_id, + segment_id, + field_id, + 0); + chunk_manager_->Write( + log_path, serialized_bytes.data(), serialized_bytes.size()); + + storage::FileManagerContext ctx(field_meta, index_meta, chunk_manager_); + std::vector index_files; + + Config config; + config["index_type"] = milvus::index::BITMAP_INDEX_TYPE; + config["insert_files"] = std::vector{log_path}; + config["bitmap_cardinality_limit"] = "1000"; + + auto build_index = + indexbuilder::IndexFactory::GetInstance().CreateIndex( + DataType::ARRAY, config, ctx); + build_index->Build(); + + auto binary_set = build_index->Upload(); + for (const auto& [key, _] : binary_set.binary_map_) { + index_files.push_back(key); + } + + index::CreateIndexInfo index_info{}; + index_info.index_type = milvus::index::BITMAP_INDEX_TYPE; + index_info.field_type = DataType::ARRAY; + + config["index_files"] = index_files; + + index_ = + index::IndexFactory::GetInstance().CreateIndex(index_info, ctx); + index_->Load(milvus::tracer::TraceContext{}, config); + } + + void + SetUp() override { + nb_ = 10000; + cardinality_ = 30; + + // if constexpr (std::is_same_v) { + // type_ = DataType::INT8; + // } else if constexpr (std::is_same_v) { + // type_ = DataType::INT16; + // } else if constexpr (std::is_same_v) { + // type_ = DataType::INT32; + // } else if constexpr (std::is_same_v) { + // type_ = DataType::INT64; + // } else if constexpr (std::is_same_v) { + // type_ = DataType::VARCHAR; + // } + int64_t collection_id = 1; + int64_t partition_id = 2; + int64_t segment_id = 3; + int64_t field_id = 101; + int64_t index_build_id = 1000; + int64_t index_version = 10000; + std::string root_path = "/tmp/test-bitmap-index/"; + + storage::StorageConfig storage_config; + storage_config.storage_type = "local"; + storage_config.root_path = root_path; + chunk_manager_ = storage::CreateChunkManager(storage_config); + + Init(collection_id, + partition_id, + segment_id, + field_id, + index_build_id, + index_version); + } + + virtual ~ArrayBitmapIndexTest() override { + boost::filesystem::remove_all(chunk_manager_->GetRootPath()); + } + + public: + void + TestInFunc() { + // boost::container::vector test_data; + // std::unordered_set s; + // size_t nq = 10; + // for (size_t i = 0; i < nq; i++) { + // test_data.push_back(data_[i]); + // s.insert(data_[i]); + // } + // auto index_ptr = dynamic_cast*>(index_.get()); + // auto bitset = index_ptr->In(test_data.size(), test_data.data()); + // for (size_t i = 0; i < bitset.size(); i++) { + // ASSERT_EQ(bitset[i], s.find(data_[i]) != s.end()); + // } + } + + private: + std::shared_ptr chunk_manager_; + + public: + DataType type_; + IndexBasePtr index_; + size_t nb_; + size_t cardinality_; + std::vector data_; +}; + +TYPED_TEST_SUITE_P(ArrayBitmapIndexTest); + +TYPED_TEST_P(ArrayBitmapIndexTest, CountFuncTest) { + auto count = this->index_->Count(); + EXPECT_EQ(count, this->nb_); +} + +TYPED_TEST_P(ArrayBitmapIndexTest, INFuncTest) { + // this->TestInFunc(); +} + +TYPED_TEST_P(ArrayBitmapIndexTest, NotINFuncTest) { + //this->TestNotInFunc(); +} + +using BitmapType = + testing::Types; + +REGISTER_TYPED_TEST_SUITE_P(ArrayBitmapIndexTest, + CountFuncTest, + INFuncTest, + NotINFuncTest); + +INSTANTIATE_TYPED_TEST_SUITE_P(ArrayBitmapE2ECheck, + ArrayBitmapIndexTest, + BitmapType); diff --git a/internal/core/unittest/test_hybrid_index.cpp b/internal/core/unittest/test_hybrid_index.cpp index 42087199300df..1f6ea6aef8fbb 100644 --- a/internal/core/unittest/test_hybrid_index.cpp +++ b/internal/core/unittest/test_hybrid_index.cpp @@ -24,6 +24,7 @@ #include "index/IndexFactory.h" #include "test_utils/indexbuilder_test_utils.h" #include "index/Meta.h" +#include "pb/schema.pb.h" using namespace milvus::index; using namespace milvus::indexbuilder; @@ -70,8 +71,24 @@ class HybridIndexTestV1 : public testing::Test { int64_t field_id, int64_t index_build_id, int64_t index_version) { + proto::schema::FieldSchema field_schema; + if constexpr (std::is_same_v) { + field_schema.set_data_type(proto::schema::DataType::Int8); + } else if constexpr (std::is_same_v) { + field_schema.set_data_type(proto::schema::DataType::Int16); + } else if constexpr (std::is_same_v) { + field_schema.set_data_type(proto::schema::DataType::Int32); + } else if constexpr (std::is_same_v) { + field_schema.set_data_type(proto::schema::DataType::Int64); + } else if constexpr (std::is_same_v) { + field_schema.set_data_type(proto::schema::DataType::Float); + } else if constexpr (std::is_same_v) { + field_schema.set_data_type(proto::schema::DataType::Double); + } else if constexpr (std::is_same_v) { + field_schema.set_data_type(proto::schema::DataType::String); + } auto field_meta = storage::FieldDataMeta{ - collection_id, partition_id, segment_id, field_id}; + collection_id, partition_id, segment_id, field_id, field_schema}; auto index_meta = storage::IndexMeta{ segment_id, field_id, index_build_id, index_version}; diff --git a/internal/core/unittest/test_inverted_index.cpp b/internal/core/unittest/test_inverted_index.cpp index c8b9bf3663235..83d3a65673174 100644 --- a/internal/core/unittest/test_inverted_index.cpp +++ b/internal/core/unittest/test_inverted_index.cpp @@ -40,8 +40,9 @@ gen_field_meta(int64_t collection_id = 1, .segment_id = segment_id, .field_id = field_id, }; - meta.schema.set_data_type(static_cast(data_type)); - meta.schema.set_element_type( + meta.field_schema.set_data_type( + static_cast(data_type)); + meta.field_schema.set_element_type( static_cast(element_type)); return meta; } diff --git a/internal/core/unittest/test_scalar_index.cpp b/internal/core/unittest/test_scalar_index.cpp index 9a99bec26a272..2d3e6bb213af1 100644 --- a/internal/core/unittest/test_scalar_index.cpp +++ b/internal/core/unittest/test_scalar_index.cpp @@ -56,7 +56,7 @@ TYPED_TEST_P(TypedScalarIndexTest, Dummy) { auto GetTempFileManagerCtx(CDataType data_type) { auto ctx = milvus::storage::FileManagerContext(); - ctx.fieldDataMeta.schema.set_data_type( + ctx.fieldDataMeta.field_schema.set_data_type( static_cast(data_type)); return ctx; } @@ -356,60 +356,6 @@ struct TypedScalarIndexTestV2::Helper { using C = arrow::DoubleType; }; -TYPED_TEST_SUITE_P(TypedScalarIndexTestV2); - -TYPED_TEST_P(TypedScalarIndexTestV2, Base) { - using T = TypeParam; - auto dtype = milvus::GetDType(); - auto index_types = GetIndexTypesV2(); - for (const auto& index_type : index_types) { - milvus::index::CreateIndexInfo create_index_info; - create_index_info.field_type = milvus::DataType(dtype); - create_index_info.index_type = index_type; - create_index_info.field_name = "scalar"; - - auto storage_config = get_default_local_storage_config(); - auto chunk_manager = - milvus::storage::CreateChunkManager(storage_config); - - milvus::test::TmpPath tmp_path; - auto temp_path = tmp_path.get(); - auto vec_size = DIM * 4; - auto dataset = GenDataset(nb, knowhere::metric::L2, false); - auto scalars = GenSortedArr(nb); - auto space = TestSpace(temp_path, vec_size, dataset, scalars); - milvus::storage::FileManagerContext file_manager_context( - {}, {.field_name = "scalar"}, chunk_manager, space); - file_manager_context.fieldDataMeta.schema.set_data_type( - static_cast(dtype)); - auto index = - milvus::index::IndexFactory::GetInstance().CreateScalarIndex( - create_index_info, file_manager_context, space); - auto scalar_index = - dynamic_cast*>(index.get()); - milvus::Config config; - if (index_type == "BITMAP") { - config["bitmap_cardinality_limit"] = "1000"; - } - scalar_index->BuildV2(config); - scalar_index->UploadV2(); - - auto new_index = - milvus::index::IndexFactory::GetInstance().CreateScalarIndex( - create_index_info, file_manager_context, space); - auto new_scalar_index = - dynamic_cast*>(new_index.get()); - new_scalar_index->LoadV2(); - ASSERT_EQ(nb, new_scalar_index->Count()); - } -} - -REGISTER_TYPED_TEST_SUITE_P(TypedScalarIndexTestV2, Base); - -INSTANTIATE_TYPED_TEST_SUITE_P(ArithmeticCheck, - TypedScalarIndexTestV2, - ScalarT); - using namespace milvus::index; template std::vector diff --git a/internal/core/unittest/test_string_index.cpp b/internal/core/unittest/test_string_index.cpp index f26a59645cbe1..bd006a5caf854 100644 --- a/internal/core/unittest/test_string_index.cpp +++ b/internal/core/unittest/test_string_index.cpp @@ -123,7 +123,7 @@ TEST_F(StringIndexMarisaTest, Reverse) { auto index_types = GetIndexTypes(); for (const auto& index_type : index_types) { auto index = milvus::index::IndexFactory::GetInstance() - .CreateScalarIndex(index_type); + .CreatePrimitiveScalarIndex(index_type); index->Build(nb, strs.data()); assert_reverse(index.get(), strs); } diff --git a/internal/core/unittest/test_utils/indexbuilder_test_utils.h b/internal/core/unittest/test_utils/indexbuilder_test_utils.h index 8581c0453c8d0..a02c5cfe3b19a 100644 --- a/internal/core/unittest/test_utils/indexbuilder_test_utils.h +++ b/internal/core/unittest/test_utils/indexbuilder_test_utils.h @@ -491,17 +491,14 @@ GetIndexTypes() { template inline std::vector GetIndexTypesV2() { - return std::vector{"sort", - milvus::index::INVERTED_INDEX_TYPE, - milvus::index::BITMAP_INDEX_TYPE}; + return std::vector{"sort", milvus::index::INVERTED_INDEX_TYPE}; } template <> inline std::vector GetIndexTypesV2() { return std::vector{"marisa", - milvus::index::INVERTED_INDEX_TYPE, - milvus::index::BITMAP_INDEX_TYPE}; + milvus::index::INVERTED_INDEX_TYPE}; } } // namespace diff --git a/pkg/util/indexparamcheck/bitmap_checker_test.go b/pkg/util/indexparamcheck/bitmap_checker_test.go index aa1baa8963433..7f1bb38986a85 100644 --- a/pkg/util/indexparamcheck/bitmap_checker_test.go +++ b/pkg/util/indexparamcheck/bitmap_checker_test.go @@ -16,9 +16,9 @@ func Test_BitmapIndexChecker(t *testing.T) { assert.NoError(t, c.CheckValidDataType(schemapb.DataType_Int64)) assert.NoError(t, c.CheckValidDataType(schemapb.DataType_Float)) assert.NoError(t, c.CheckValidDataType(schemapb.DataType_String)) + assert.NoError(t, c.CheckValidDataType(schemapb.DataType_Array)) assert.Error(t, c.CheckValidDataType(schemapb.DataType_JSON)) - assert.Error(t, c.CheckValidDataType(schemapb.DataType_Array)) assert.Error(t, c.CheckTrain(map[string]string{})) assert.Error(t, c.CheckTrain(map[string]string{"bitmap_cardinality_limit": "0"})) } diff --git a/pkg/util/indexparamcheck/bitmap_index_checker.go b/pkg/util/indexparamcheck/bitmap_index_checker.go index d41267987d860..3b9be2786e3b5 100644 --- a/pkg/util/indexparamcheck/bitmap_index_checker.go +++ b/pkg/util/indexparamcheck/bitmap_index_checker.go @@ -21,8 +21,8 @@ func (c *BITMAPChecker) CheckTrain(params map[string]string) error { } func (c *BITMAPChecker) CheckValidDataType(dType schemapb.DataType) error { - if !typeutil.IsArithmetic(dType) && !typeutil.IsStringType(dType) { - return fmt.Errorf("bitmap index are only supported on numeric and string field") + if !typeutil.IsArithmetic(dType) && !typeutil.IsStringType(dType) && !typeutil.IsArrayType(dType) { + return fmt.Errorf("bitmap index are only supported on numeric, string and array field") } return nil } From 8537f3daeb144d26a0da28612e7e0d38b34c9572 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Sun, 16 Jun 2024 22:07:57 +0800 Subject: [PATCH 05/10] enhance: Rename Compaction to CompactionV2 (#33858) Due to the removal of injection and syncSegments from the compaction, we need to ensure that no compaction is successfully executed during the rolling upgrade. This PR renames Compaction to CompactionV2, with the following effects: - New datacoord + old datanode: Utilizes the CompactionV2 interface, resulting in the datanode error "CompactionV2 not implemented," causing compaction to fail; - Old datacoord + new datanode: Utilizes the CompactionV1 interface, resulting in the datanode error "CompactionV1 not implemented," causing compaction to fail. issue: https://github.com/milvus-io/milvus/issues/32809 Signed-off-by: bigsheeper --- internal/datacoord/mock_test.go | 2 +- internal/datacoord/session_manager.go | 2 +- internal/datanode/services.go | 4 +- internal/datanode/services_test.go | 6 +- .../distributed/datanode/client/client.go | 6 +- .../datanode/client/client_test.go | 2 +- internal/distributed/datanode/service.go | 4 +- internal/distributed/datanode/service_test.go | 4 +- internal/mocks/mock_datanode.go | 110 +++++++------- internal/mocks/mock_datanode_client.go | 140 +++++++++--------- internal/proto/data_coord.proto | 2 +- internal/util/mock/grpc_datanode_client.go | 2 +- 12 files changed, 142 insertions(+), 142 deletions(-) diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index b49bfc91cb5d3..14ff9921eebbc 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -265,7 +265,7 @@ func (c *mockDataNodeClient) GetMetrics(ctx context.Context, req *milvuspb.GetMe }, nil } -func (c *mockDataNodeClient) Compaction(ctx context.Context, req *datapb.CompactionPlan, opts ...grpc.CallOption) (*commonpb.Status, error) { +func (c *mockDataNodeClient) CompactionV2(ctx context.Context, req *datapb.CompactionPlan, opts ...grpc.CallOption) (*commonpb.Status, error) { if c.ch != nil { c.ch <- struct{}{} if c.compactionResp != nil { diff --git a/internal/datacoord/session_manager.go b/internal/datacoord/session_manager.go index 31e7a3eabf5af..6be4eab1e67bb 100644 --- a/internal/datacoord/session_manager.go +++ b/internal/datacoord/session_manager.go @@ -213,7 +213,7 @@ func (c *SessionManagerImpl) Compaction(ctx context.Context, nodeID int64, plan return err } - resp, err := cli.Compaction(ctx, plan) + resp, err := cli.CompactionV2(ctx, plan) if err := VerifyResponse(resp, err); err != nil { log.Warn("failed to execute compaction", zap.Int64("node", nodeID), zap.Error(err), zap.Int64("planID", plan.GetPlanID())) return err diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 33a512a8f826e..73db822a52f1c 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -200,9 +200,9 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe }, nil } -// Compaction handles compaction request from DataCoord +// CompactionV2 handles compaction request from DataCoord // returns status as long as compaction task enqueued or invalid -func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) { +func (node *DataNode) CompactionV2(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) { log := log.Ctx(ctx).With(zap.Int64("planID", req.GetPlanID())) if err := merr.CheckHealthy(node.GetStateCode()); err != nil { log.Warn("DataNode.Compaction failed", zap.Int64("nodeId", node.GetNodeID()), zap.Error(err)) diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 904154c9045a8..156b076d8d201 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -226,7 +226,7 @@ func (s *DataNodeServicesSuite) TestCompaction() { Channel: dmChannelName, } - resp, err := node.Compaction(ctx, req) + resp, err := node.CompactionV2(ctx, req) s.NoError(err) s.False(merr.Ok(resp)) }) @@ -245,7 +245,7 @@ func (s *DataNodeServicesSuite) TestCompaction() { }, } - resp, err := node.Compaction(ctx, req) + resp, err := node.CompactionV2(ctx, req) s.NoError(err) s.False(merr.Ok(resp)) }) @@ -265,7 +265,7 @@ func (s *DataNodeServicesSuite) TestCompaction() { Type: datapb.CompactionType_ClusteringCompaction, } - _, err := node.Compaction(ctx, req) + _, err := node.CompactionV2(ctx, req) s.NoError(err) }) } diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index 10ba96f0c360f..67d5081a19e8c 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -173,10 +173,10 @@ func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest }) } -// Compaction return compaction by given plan -func (c *Client) Compaction(ctx context.Context, req *datapb.CompactionPlan, opts ...grpc.CallOption) (*commonpb.Status, error) { +// CompactionV2 return compaction by given plan +func (c *Client) CompactionV2(ctx context.Context, req *datapb.CompactionPlan, opts ...grpc.CallOption) (*commonpb.Status, error) { return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*commonpb.Status, error) { - return client.Compaction(ctx, req) + return client.CompactionV2(ctx, req) }) } diff --git a/internal/distributed/datanode/client/client_test.go b/internal/distributed/datanode/client/client_test.go index f46d39bc12071..03e4b64e74e62 100644 --- a/internal/distributed/datanode/client/client_test.go +++ b/internal/distributed/datanode/client/client_test.go @@ -66,7 +66,7 @@ func Test_NewClient(t *testing.T) { r5, err := client.GetMetrics(ctx, nil) retCheck(retNotNil, r5, err) - r6, err := client.Compaction(ctx, nil) + r6, err := client.CompactionV2(ctx, nil) retCheck(retNotNil, r6, err) r8, err := client.ResendSegmentStats(ctx, nil) diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 9b68853960a4c..2e530546d1977 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -354,8 +354,8 @@ func (s *Server) GetMetrics(ctx context.Context, request *milvuspb.GetMetricsReq return s.datanode.GetMetrics(ctx, request) } -func (s *Server) Compaction(ctx context.Context, request *datapb.CompactionPlan) (*commonpb.Status, error) { - return s.datanode.Compaction(ctx, request) +func (s *Server) CompactionV2(ctx context.Context, request *datapb.CompactionPlan) (*commonpb.Status, error) { + return s.datanode.CompactionV2(ctx, request) } // GetCompactionState gets the Compaction tasks state of DataNode diff --git a/internal/distributed/datanode/service_test.go b/internal/distributed/datanode/service_test.go index 655738732c1e6..66390ae4fcb13 100644 --- a/internal/distributed/datanode/service_test.go +++ b/internal/distributed/datanode/service_test.go @@ -126,7 +126,7 @@ func (m *MockDataNode) GetMetrics(ctx context.Context, request *milvuspb.GetMetr return m.metricResp, m.err } -func (m *MockDataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) { +func (m *MockDataNode) CompactionV2(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) { return m.status, m.err } @@ -289,7 +289,7 @@ func Test_NewServer(t *testing.T) { server.datanode = &MockDataNode{ status: &commonpb.Status{}, } - resp, err := server.Compaction(ctx, nil) + resp, err := server.CompactionV2(ctx, nil) assert.NoError(t, err) assert.NotNil(t, resp) }) diff --git a/internal/mocks/mock_datanode.go b/internal/mocks/mock_datanode.go index 1d758de233c54..6ba06b0009b15 100644 --- a/internal/mocks/mock_datanode.go +++ b/internal/mocks/mock_datanode.go @@ -64,8 +64,8 @@ type MockDataNode_CheckChannelOperationProgress_Call struct { } // CheckChannelOperationProgress is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.ChannelWatchInfo +// - _a0 context.Context +// - _a1 *datapb.ChannelWatchInfo func (_e *MockDataNode_Expecter) CheckChannelOperationProgress(_a0 interface{}, _a1 interface{}) *MockDataNode_CheckChannelOperationProgress_Call { return &MockDataNode_CheckChannelOperationProgress_Call{Call: _e.mock.On("CheckChannelOperationProgress", _a0, _a1)} } @@ -87,8 +87,8 @@ func (_c *MockDataNode_CheckChannelOperationProgress_Call) RunAndReturn(run func return _c } -// Compaction provides a mock function with given fields: _a0, _a1 -func (_m *MockDataNode) Compaction(_a0 context.Context, _a1 *datapb.CompactionPlan) (*commonpb.Status, error) { +// CompactionV2 provides a mock function with given fields: _a0, _a1 +func (_m *MockDataNode) CompactionV2(_a0 context.Context, _a1 *datapb.CompactionPlan) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) var r0 *commonpb.Status @@ -113,31 +113,31 @@ func (_m *MockDataNode) Compaction(_a0 context.Context, _a1 *datapb.CompactionPl return r0, r1 } -// MockDataNode_Compaction_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Compaction' -type MockDataNode_Compaction_Call struct { +// MockDataNode_CompactionV2_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CompactionV2' +type MockDataNode_CompactionV2_Call struct { *mock.Call } -// Compaction is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.CompactionPlan -func (_e *MockDataNode_Expecter) Compaction(_a0 interface{}, _a1 interface{}) *MockDataNode_Compaction_Call { - return &MockDataNode_Compaction_Call{Call: _e.mock.On("Compaction", _a0, _a1)} +// CompactionV2 is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *datapb.CompactionPlan +func (_e *MockDataNode_Expecter) CompactionV2(_a0 interface{}, _a1 interface{}) *MockDataNode_CompactionV2_Call { + return &MockDataNode_CompactionV2_Call{Call: _e.mock.On("CompactionV2", _a0, _a1)} } -func (_c *MockDataNode_Compaction_Call) Run(run func(_a0 context.Context, _a1 *datapb.CompactionPlan)) *MockDataNode_Compaction_Call { +func (_c *MockDataNode_CompactionV2_Call) Run(run func(_a0 context.Context, _a1 *datapb.CompactionPlan)) *MockDataNode_CompactionV2_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context), args[1].(*datapb.CompactionPlan)) }) return _c } -func (_c *MockDataNode_Compaction_Call) Return(_a0 *commonpb.Status, _a1 error) *MockDataNode_Compaction_Call { +func (_c *MockDataNode_CompactionV2_Call) Return(_a0 *commonpb.Status, _a1 error) *MockDataNode_CompactionV2_Call { _c.Call.Return(_a0, _a1) return _c } -func (_c *MockDataNode_Compaction_Call) RunAndReturn(run func(context.Context, *datapb.CompactionPlan) (*commonpb.Status, error)) *MockDataNode_Compaction_Call { +func (_c *MockDataNode_CompactionV2_Call) RunAndReturn(run func(context.Context, *datapb.CompactionPlan) (*commonpb.Status, error)) *MockDataNode_CompactionV2_Call { _c.Call.Return(run) return _c } @@ -174,8 +174,8 @@ type MockDataNode_DropCompactionPlan_Call struct { } // DropCompactionPlan is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.DropCompactionPlanRequest +// - _a0 context.Context +// - _a1 *datapb.DropCompactionPlanRequest func (_e *MockDataNode_Expecter) DropCompactionPlan(_a0 interface{}, _a1 interface{}) *MockDataNode_DropCompactionPlan_Call { return &MockDataNode_DropCompactionPlan_Call{Call: _e.mock.On("DropCompactionPlan", _a0, _a1)} } @@ -229,8 +229,8 @@ type MockDataNode_DropImport_Call struct { } // DropImport is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.DropImportRequest +// - _a0 context.Context +// - _a1 *datapb.DropImportRequest func (_e *MockDataNode_Expecter) DropImport(_a0 interface{}, _a1 interface{}) *MockDataNode_DropImport_Call { return &MockDataNode_DropImport_Call{Call: _e.mock.On("DropImport", _a0, _a1)} } @@ -284,8 +284,8 @@ type MockDataNode_FlushChannels_Call struct { } // FlushChannels is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.FlushChannelsRequest +// - _a0 context.Context +// - _a1 *datapb.FlushChannelsRequest func (_e *MockDataNode_Expecter) FlushChannels(_a0 interface{}, _a1 interface{}) *MockDataNode_FlushChannels_Call { return &MockDataNode_FlushChannels_Call{Call: _e.mock.On("FlushChannels", _a0, _a1)} } @@ -339,8 +339,8 @@ type MockDataNode_FlushSegments_Call struct { } // FlushSegments is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.FlushSegmentsRequest +// - _a0 context.Context +// - _a1 *datapb.FlushSegmentsRequest func (_e *MockDataNode_Expecter) FlushSegments(_a0 interface{}, _a1 interface{}) *MockDataNode_FlushSegments_Call { return &MockDataNode_FlushSegments_Call{Call: _e.mock.On("FlushSegments", _a0, _a1)} } @@ -435,8 +435,8 @@ type MockDataNode_GetCompactionState_Call struct { } // GetCompactionState is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.CompactionStateRequest +// - _a0 context.Context +// - _a1 *datapb.CompactionStateRequest func (_e *MockDataNode_Expecter) GetCompactionState(_a0 interface{}, _a1 interface{}) *MockDataNode_GetCompactionState_Call { return &MockDataNode_GetCompactionState_Call{Call: _e.mock.On("GetCompactionState", _a0, _a1)} } @@ -490,8 +490,8 @@ type MockDataNode_GetComponentStates_Call struct { } // GetComponentStates is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *milvuspb.GetComponentStatesRequest +// - _a0 context.Context +// - _a1 *milvuspb.GetComponentStatesRequest func (_e *MockDataNode_Expecter) GetComponentStates(_a0 interface{}, _a1 interface{}) *MockDataNode_GetComponentStates_Call { return &MockDataNode_GetComponentStates_Call{Call: _e.mock.On("GetComponentStates", _a0, _a1)} } @@ -545,8 +545,8 @@ type MockDataNode_GetMetrics_Call struct { } // GetMetrics is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *milvuspb.GetMetricsRequest +// - _a0 context.Context +// - _a1 *milvuspb.GetMetricsRequest func (_e *MockDataNode_Expecter) GetMetrics(_a0 interface{}, _a1 interface{}) *MockDataNode_GetMetrics_Call { return &MockDataNode_GetMetrics_Call{Call: _e.mock.On("GetMetrics", _a0, _a1)} } @@ -682,8 +682,8 @@ type MockDataNode_GetStatisticsChannel_Call struct { } // GetStatisticsChannel is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *internalpb.GetStatisticsChannelRequest +// - _a0 context.Context +// - _a1 *internalpb.GetStatisticsChannelRequest func (_e *MockDataNode_Expecter) GetStatisticsChannel(_a0 interface{}, _a1 interface{}) *MockDataNode_GetStatisticsChannel_Call { return &MockDataNode_GetStatisticsChannel_Call{Call: _e.mock.On("GetStatisticsChannel", _a0, _a1)} } @@ -737,8 +737,8 @@ type MockDataNode_ImportV2_Call struct { } // ImportV2 is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.ImportRequest +// - _a0 context.Context +// - _a1 *datapb.ImportRequest func (_e *MockDataNode_Expecter) ImportV2(_a0 interface{}, _a1 interface{}) *MockDataNode_ImportV2_Call { return &MockDataNode_ImportV2_Call{Call: _e.mock.On("ImportV2", _a0, _a1)} } @@ -833,8 +833,8 @@ type MockDataNode_NotifyChannelOperation_Call struct { } // NotifyChannelOperation is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.ChannelOperationsRequest +// - _a0 context.Context +// - _a1 *datapb.ChannelOperationsRequest func (_e *MockDataNode_Expecter) NotifyChannelOperation(_a0 interface{}, _a1 interface{}) *MockDataNode_NotifyChannelOperation_Call { return &MockDataNode_NotifyChannelOperation_Call{Call: _e.mock.On("NotifyChannelOperation", _a0, _a1)} } @@ -888,8 +888,8 @@ type MockDataNode_PreImport_Call struct { } // PreImport is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.PreImportRequest +// - _a0 context.Context +// - _a1 *datapb.PreImportRequest func (_e *MockDataNode_Expecter) PreImport(_a0 interface{}, _a1 interface{}) *MockDataNode_PreImport_Call { return &MockDataNode_PreImport_Call{Call: _e.mock.On("PreImport", _a0, _a1)} } @@ -943,8 +943,8 @@ type MockDataNode_QueryImport_Call struct { } // QueryImport is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.QueryImportRequest +// - _a0 context.Context +// - _a1 *datapb.QueryImportRequest func (_e *MockDataNode_Expecter) QueryImport(_a0 interface{}, _a1 interface{}) *MockDataNode_QueryImport_Call { return &MockDataNode_QueryImport_Call{Call: _e.mock.On("QueryImport", _a0, _a1)} } @@ -998,8 +998,8 @@ type MockDataNode_QueryPreImport_Call struct { } // QueryPreImport is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.QueryPreImportRequest +// - _a0 context.Context +// - _a1 *datapb.QueryPreImportRequest func (_e *MockDataNode_Expecter) QueryPreImport(_a0 interface{}, _a1 interface{}) *MockDataNode_QueryPreImport_Call { return &MockDataNode_QueryPreImport_Call{Call: _e.mock.On("QueryPreImport", _a0, _a1)} } @@ -1053,8 +1053,8 @@ type MockDataNode_QuerySlot_Call struct { } // QuerySlot is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.QuerySlotRequest +// - _a0 context.Context +// - _a1 *datapb.QuerySlotRequest func (_e *MockDataNode_Expecter) QuerySlot(_a0 interface{}, _a1 interface{}) *MockDataNode_QuerySlot_Call { return &MockDataNode_QuerySlot_Call{Call: _e.mock.On("QuerySlot", _a0, _a1)} } @@ -1149,8 +1149,8 @@ type MockDataNode_ResendSegmentStats_Call struct { } // ResendSegmentStats is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.ResendSegmentStatsRequest +// - _a0 context.Context +// - _a1 *datapb.ResendSegmentStatsRequest func (_e *MockDataNode_Expecter) ResendSegmentStats(_a0 interface{}, _a1 interface{}) *MockDataNode_ResendSegmentStats_Call { return &MockDataNode_ResendSegmentStats_Call{Call: _e.mock.On("ResendSegmentStats", _a0, _a1)} } @@ -1183,7 +1183,7 @@ type MockDataNode_SetAddress_Call struct { } // SetAddress is a helper method to define mock.On call -// - address string +// - address string func (_e *MockDataNode_Expecter) SetAddress(address interface{}) *MockDataNode_SetAddress_Call { return &MockDataNode_SetAddress_Call{Call: _e.mock.On("SetAddress", address)} } @@ -1225,7 +1225,7 @@ type MockDataNode_SetDataCoordClient_Call struct { } // SetDataCoordClient is a helper method to define mock.On call -// - dataCoord types.DataCoordClient +// - dataCoord types.DataCoordClient func (_e *MockDataNode_Expecter) SetDataCoordClient(dataCoord interface{}) *MockDataNode_SetDataCoordClient_Call { return &MockDataNode_SetDataCoordClient_Call{Call: _e.mock.On("SetDataCoordClient", dataCoord)} } @@ -1258,7 +1258,7 @@ type MockDataNode_SetEtcdClient_Call struct { } // SetEtcdClient is a helper method to define mock.On call -// - etcdClient *clientv3.Client +// - etcdClient *clientv3.Client func (_e *MockDataNode_Expecter) SetEtcdClient(etcdClient interface{}) *MockDataNode_SetEtcdClient_Call { return &MockDataNode_SetEtcdClient_Call{Call: _e.mock.On("SetEtcdClient", etcdClient)} } @@ -1300,7 +1300,7 @@ type MockDataNode_SetRootCoordClient_Call struct { } // SetRootCoordClient is a helper method to define mock.On call -// - rootCoord types.RootCoordClient +// - rootCoord types.RootCoordClient func (_e *MockDataNode_Expecter) SetRootCoordClient(rootCoord interface{}) *MockDataNode_SetRootCoordClient_Call { return &MockDataNode_SetRootCoordClient_Call{Call: _e.mock.On("SetRootCoordClient", rootCoord)} } @@ -1354,8 +1354,8 @@ type MockDataNode_ShowConfigurations_Call struct { } // ShowConfigurations is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *internalpb.ShowConfigurationsRequest +// - _a0 context.Context +// - _a1 *internalpb.ShowConfigurationsRequest func (_e *MockDataNode_Expecter) ShowConfigurations(_a0 interface{}, _a1 interface{}) *MockDataNode_ShowConfigurations_Call { return &MockDataNode_ShowConfigurations_Call{Call: _e.mock.On("ShowConfigurations", _a0, _a1)} } @@ -1491,8 +1491,8 @@ type MockDataNode_SyncSegments_Call struct { } // SyncSegments is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.SyncSegmentsRequest +// - _a0 context.Context +// - _a1 *datapb.SyncSegmentsRequest func (_e *MockDataNode_Expecter) SyncSegments(_a0 interface{}, _a1 interface{}) *MockDataNode_SyncSegments_Call { return &MockDataNode_SyncSegments_Call{Call: _e.mock.On("SyncSegments", _a0, _a1)} } @@ -1525,7 +1525,7 @@ type MockDataNode_UpdateStateCode_Call struct { } // UpdateStateCode is a helper method to define mock.On call -// - stateCode commonpb.StateCode +// - stateCode commonpb.StateCode func (_e *MockDataNode_Expecter) UpdateStateCode(stateCode interface{}) *MockDataNode_UpdateStateCode_Call { return &MockDataNode_UpdateStateCode_Call{Call: _e.mock.On("UpdateStateCode", stateCode)} } @@ -1579,8 +1579,8 @@ type MockDataNode_WatchDmChannels_Call struct { } // WatchDmChannels is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *datapb.WatchDmChannelsRequest +// - _a0 context.Context +// - _a1 *datapb.WatchDmChannelsRequest func (_e *MockDataNode_Expecter) WatchDmChannels(_a0 interface{}, _a1 interface{}) *MockDataNode_WatchDmChannels_Call { return &MockDataNode_WatchDmChannels_Call{Call: _e.mock.On("WatchDmChannels", _a0, _a1)} } diff --git a/internal/mocks/mock_datanode_client.go b/internal/mocks/mock_datanode_client.go index 8a244305c8daa..0b4f876803145 100644 --- a/internal/mocks/mock_datanode_client.go +++ b/internal/mocks/mock_datanode_client.go @@ -70,9 +70,9 @@ type MockDataNodeClient_CheckChannelOperationProgress_Call struct { } // CheckChannelOperationProgress is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.ChannelWatchInfo -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.ChannelWatchInfo +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) CheckChannelOperationProgress(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_CheckChannelOperationProgress_Call { return &MockDataNodeClient_CheckChannelOperationProgress_Call{Call: _e.mock.On("CheckChannelOperationProgress", append([]interface{}{ctx, in}, opts...)...)} @@ -142,8 +142,8 @@ func (_c *MockDataNodeClient_Close_Call) RunAndReturn(run func() error) *MockDat return _c } -// Compaction provides a mock function with given fields: ctx, in, opts -func (_m *MockDataNodeClient) Compaction(ctx context.Context, in *datapb.CompactionPlan, opts ...grpc.CallOption) (*commonpb.Status, error) { +// CompactionV2 provides a mock function with given fields: ctx, in, opts +func (_m *MockDataNodeClient) CompactionV2(ctx context.Context, in *datapb.CompactionPlan, opts ...grpc.CallOption) (*commonpb.Status, error) { _va := make([]interface{}, len(opts)) for _i := range opts { _va[_i] = opts[_i] @@ -175,21 +175,21 @@ func (_m *MockDataNodeClient) Compaction(ctx context.Context, in *datapb.Compact return r0, r1 } -// MockDataNodeClient_Compaction_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Compaction' -type MockDataNodeClient_Compaction_Call struct { +// MockDataNodeClient_CompactionV2_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CompactionV2' +type MockDataNodeClient_CompactionV2_Call struct { *mock.Call } -// Compaction is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.CompactionPlan -// - opts ...grpc.CallOption -func (_e *MockDataNodeClient_Expecter) Compaction(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_Compaction_Call { - return &MockDataNodeClient_Compaction_Call{Call: _e.mock.On("Compaction", +// CompactionV2 is a helper method to define mock.On call +// - ctx context.Context +// - in *datapb.CompactionPlan +// - opts ...grpc.CallOption +func (_e *MockDataNodeClient_Expecter) CompactionV2(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_CompactionV2_Call { + return &MockDataNodeClient_CompactionV2_Call{Call: _e.mock.On("CompactionV2", append([]interface{}{ctx, in}, opts...)...)} } -func (_c *MockDataNodeClient_Compaction_Call) Run(run func(ctx context.Context, in *datapb.CompactionPlan, opts ...grpc.CallOption)) *MockDataNodeClient_Compaction_Call { +func (_c *MockDataNodeClient_CompactionV2_Call) Run(run func(ctx context.Context, in *datapb.CompactionPlan, opts ...grpc.CallOption)) *MockDataNodeClient_CompactionV2_Call { _c.Call.Run(func(args mock.Arguments) { variadicArgs := make([]grpc.CallOption, len(args)-2) for i, a := range args[2:] { @@ -202,12 +202,12 @@ func (_c *MockDataNodeClient_Compaction_Call) Run(run func(ctx context.Context, return _c } -func (_c *MockDataNodeClient_Compaction_Call) Return(_a0 *commonpb.Status, _a1 error) *MockDataNodeClient_Compaction_Call { +func (_c *MockDataNodeClient_CompactionV2_Call) Return(_a0 *commonpb.Status, _a1 error) *MockDataNodeClient_CompactionV2_Call { _c.Call.Return(_a0, _a1) return _c } -func (_c *MockDataNodeClient_Compaction_Call) RunAndReturn(run func(context.Context, *datapb.CompactionPlan, ...grpc.CallOption) (*commonpb.Status, error)) *MockDataNodeClient_Compaction_Call { +func (_c *MockDataNodeClient_CompactionV2_Call) RunAndReturn(run func(context.Context, *datapb.CompactionPlan, ...grpc.CallOption) (*commonpb.Status, error)) *MockDataNodeClient_CompactionV2_Call { _c.Call.Return(run) return _c } @@ -251,9 +251,9 @@ type MockDataNodeClient_DropCompactionPlan_Call struct { } // DropCompactionPlan is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.DropCompactionPlanRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.DropCompactionPlanRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) DropCompactionPlan(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_DropCompactionPlan_Call { return &MockDataNodeClient_DropCompactionPlan_Call{Call: _e.mock.On("DropCompactionPlan", append([]interface{}{ctx, in}, opts...)...)} @@ -321,9 +321,9 @@ type MockDataNodeClient_DropImport_Call struct { } // DropImport is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.DropImportRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.DropImportRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) DropImport(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_DropImport_Call { return &MockDataNodeClient_DropImport_Call{Call: _e.mock.On("DropImport", append([]interface{}{ctx, in}, opts...)...)} @@ -391,9 +391,9 @@ type MockDataNodeClient_FlushChannels_Call struct { } // FlushChannels is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.FlushChannelsRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.FlushChannelsRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) FlushChannels(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_FlushChannels_Call { return &MockDataNodeClient_FlushChannels_Call{Call: _e.mock.On("FlushChannels", append([]interface{}{ctx, in}, opts...)...)} @@ -461,9 +461,9 @@ type MockDataNodeClient_FlushSegments_Call struct { } // FlushSegments is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.FlushSegmentsRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.FlushSegmentsRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) FlushSegments(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_FlushSegments_Call { return &MockDataNodeClient_FlushSegments_Call{Call: _e.mock.On("FlushSegments", append([]interface{}{ctx, in}, opts...)...)} @@ -531,9 +531,9 @@ type MockDataNodeClient_GetCompactionState_Call struct { } // GetCompactionState is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.CompactionStateRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.CompactionStateRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) GetCompactionState(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_GetCompactionState_Call { return &MockDataNodeClient_GetCompactionState_Call{Call: _e.mock.On("GetCompactionState", append([]interface{}{ctx, in}, opts...)...)} @@ -601,9 +601,9 @@ type MockDataNodeClient_GetComponentStates_Call struct { } // GetComponentStates is a helper method to define mock.On call -// - ctx context.Context -// - in *milvuspb.GetComponentStatesRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *milvuspb.GetComponentStatesRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) GetComponentStates(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_GetComponentStates_Call { return &MockDataNodeClient_GetComponentStates_Call{Call: _e.mock.On("GetComponentStates", append([]interface{}{ctx, in}, opts...)...)} @@ -671,9 +671,9 @@ type MockDataNodeClient_GetMetrics_Call struct { } // GetMetrics is a helper method to define mock.On call -// - ctx context.Context -// - in *milvuspb.GetMetricsRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *milvuspb.GetMetricsRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) GetMetrics(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_GetMetrics_Call { return &MockDataNodeClient_GetMetrics_Call{Call: _e.mock.On("GetMetrics", append([]interface{}{ctx, in}, opts...)...)} @@ -741,9 +741,9 @@ type MockDataNodeClient_GetStatisticsChannel_Call struct { } // GetStatisticsChannel is a helper method to define mock.On call -// - ctx context.Context -// - in *internalpb.GetStatisticsChannelRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *internalpb.GetStatisticsChannelRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) GetStatisticsChannel(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_GetStatisticsChannel_Call { return &MockDataNodeClient_GetStatisticsChannel_Call{Call: _e.mock.On("GetStatisticsChannel", append([]interface{}{ctx, in}, opts...)...)} @@ -811,9 +811,9 @@ type MockDataNodeClient_ImportV2_Call struct { } // ImportV2 is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.ImportRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.ImportRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) ImportV2(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_ImportV2_Call { return &MockDataNodeClient_ImportV2_Call{Call: _e.mock.On("ImportV2", append([]interface{}{ctx, in}, opts...)...)} @@ -881,9 +881,9 @@ type MockDataNodeClient_NotifyChannelOperation_Call struct { } // NotifyChannelOperation is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.ChannelOperationsRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.ChannelOperationsRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) NotifyChannelOperation(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_NotifyChannelOperation_Call { return &MockDataNodeClient_NotifyChannelOperation_Call{Call: _e.mock.On("NotifyChannelOperation", append([]interface{}{ctx, in}, opts...)...)} @@ -951,9 +951,9 @@ type MockDataNodeClient_PreImport_Call struct { } // PreImport is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.PreImportRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.PreImportRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) PreImport(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_PreImport_Call { return &MockDataNodeClient_PreImport_Call{Call: _e.mock.On("PreImport", append([]interface{}{ctx, in}, opts...)...)} @@ -1021,9 +1021,9 @@ type MockDataNodeClient_QueryImport_Call struct { } // QueryImport is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.QueryImportRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.QueryImportRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) QueryImport(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_QueryImport_Call { return &MockDataNodeClient_QueryImport_Call{Call: _e.mock.On("QueryImport", append([]interface{}{ctx, in}, opts...)...)} @@ -1091,9 +1091,9 @@ type MockDataNodeClient_QueryPreImport_Call struct { } // QueryPreImport is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.QueryPreImportRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.QueryPreImportRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) QueryPreImport(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_QueryPreImport_Call { return &MockDataNodeClient_QueryPreImport_Call{Call: _e.mock.On("QueryPreImport", append([]interface{}{ctx, in}, opts...)...)} @@ -1161,9 +1161,9 @@ type MockDataNodeClient_QuerySlot_Call struct { } // QuerySlot is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.QuerySlotRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.QuerySlotRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) QuerySlot(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_QuerySlot_Call { return &MockDataNodeClient_QuerySlot_Call{Call: _e.mock.On("QuerySlot", append([]interface{}{ctx, in}, opts...)...)} @@ -1231,9 +1231,9 @@ type MockDataNodeClient_ResendSegmentStats_Call struct { } // ResendSegmentStats is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.ResendSegmentStatsRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.ResendSegmentStatsRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) ResendSegmentStats(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_ResendSegmentStats_Call { return &MockDataNodeClient_ResendSegmentStats_Call{Call: _e.mock.On("ResendSegmentStats", append([]interface{}{ctx, in}, opts...)...)} @@ -1301,9 +1301,9 @@ type MockDataNodeClient_ShowConfigurations_Call struct { } // ShowConfigurations is a helper method to define mock.On call -// - ctx context.Context -// - in *internalpb.ShowConfigurationsRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *internalpb.ShowConfigurationsRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) ShowConfigurations(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_ShowConfigurations_Call { return &MockDataNodeClient_ShowConfigurations_Call{Call: _e.mock.On("ShowConfigurations", append([]interface{}{ctx, in}, opts...)...)} @@ -1371,9 +1371,9 @@ type MockDataNodeClient_SyncSegments_Call struct { } // SyncSegments is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.SyncSegmentsRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.SyncSegmentsRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) SyncSegments(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_SyncSegments_Call { return &MockDataNodeClient_SyncSegments_Call{Call: _e.mock.On("SyncSegments", append([]interface{}{ctx, in}, opts...)...)} @@ -1441,9 +1441,9 @@ type MockDataNodeClient_WatchDmChannels_Call struct { } // WatchDmChannels is a helper method to define mock.On call -// - ctx context.Context -// - in *datapb.WatchDmChannelsRequest -// - opts ...grpc.CallOption +// - ctx context.Context +// - in *datapb.WatchDmChannelsRequest +// - opts ...grpc.CallOption func (_e *MockDataNodeClient_Expecter) WatchDmChannels(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_WatchDmChannels_Call { return &MockDataNodeClient_WatchDmChannels_Call{Call: _e.mock.On("WatchDmChannels", append([]interface{}{ctx, in}, opts...)...)} diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index 822dbdb558b04..81af5196a6496 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -110,7 +110,7 @@ service DataNode { // https://wiki.lfaidata.foundation/display/MIL/MEP+8+--+Add+metrics+for+proxy rpc GetMetrics(milvus.GetMetricsRequest) returns (milvus.GetMetricsResponse) {} - rpc Compaction(CompactionPlan) returns (common.Status) {} + rpc CompactionV2(CompactionPlan) returns (common.Status) {} rpc GetCompactionState(CompactionStateRequest) returns (CompactionStateResponse) {} rpc SyncSegments(SyncSegmentsRequest) returns (common.Status) {} diff --git a/internal/util/mock/grpc_datanode_client.go b/internal/util/mock/grpc_datanode_client.go index 49622ada6f6b1..13ae355738d80 100644 --- a/internal/util/mock/grpc_datanode_client.go +++ b/internal/util/mock/grpc_datanode_client.go @@ -57,7 +57,7 @@ func (m *GrpcDataNodeClient) GetMetrics(ctx context.Context, in *milvuspb.GetMet return &milvuspb.GetMetricsResponse{}, m.Err } -func (m *GrpcDataNodeClient) Compaction(ctx context.Context, req *datapb.CompactionPlan, opts ...grpc.CallOption) (*commonpb.Status, error) { +func (m *GrpcDataNodeClient) CompactionV2(ctx context.Context, req *datapb.CompactionPlan, opts ...grpc.CallOption) (*commonpb.Status, error) { return &commonpb.Status{}, m.Err } From 1a9ab52f66ec15fb8d6138a64b3436b2ef29d04a Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Sun, 16 Jun 2024 22:09:57 +0800 Subject: [PATCH 06/10] enhance: Ensure the idempotency of compaction task (#33872) /kind enhancement Signed-off-by: bigsheeper --- internal/datanode/compaction/executor.go | 12 +++++++----- internal/datanode/compaction/executor_test.go | 5 +++-- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/internal/datanode/compaction/executor.go b/internal/datanode/compaction/executor.go index f0e4a427de7ab..167fc03acaaae 100644 --- a/internal/datanode/compaction/executor.go +++ b/internal/datanode/compaction/executor.go @@ -70,18 +70,20 @@ func NewExecutor() *executor { } func (e *executor) Execute(task Compactor) { + _, ok := e.executing.GetOrInsert(task.GetPlanID(), task) + if ok { + log.Warn("duplicated compaction task", + zap.Int64("planID", task.GetPlanID()), + zap.String("channel", task.GetChannelName())) + return + } e.taskCh <- task - e.toExecutingState(task) } func (e *executor) Slots() int64 { return paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64() - int64(e.executing.Len()) } -func (e *executor) toExecutingState(task Compactor) { - e.executing.Insert(task.GetPlanID(), task) -} - func (e *executor) toCompleteState(task Compactor) { task.Complete() e.executing.GetAndRemove(task.GetPlanID()) diff --git a/internal/datanode/compaction/executor_test.go b/internal/datanode/compaction/executor_test.go index 164852c8e8ad2..81b64556dafe9 100644 --- a/internal/datanode/compaction/executor_test.go +++ b/internal/datanode/compaction/executor_test.go @@ -31,10 +31,11 @@ func TestCompactionExecutor(t *testing.T) { t.Run("Test execute", func(t *testing.T) { planID := int64(1) mockC := NewMockCompactor(t) - mockC.EXPECT().GetPlanID().Return(planID).Once() - mockC.EXPECT().GetChannelName().Return("ch1").Once() + mockC.EXPECT().GetPlanID().Return(planID) + mockC.EXPECT().GetChannelName().Return("ch1") executor := NewExecutor() executor.Execute(mockC) + executor.Execute(mockC) assert.EqualValues(t, 1, len(executor.taskCh)) assert.EqualValues(t, 1, executor.executing.Len()) From b72026c85945ae1adad3e4787f81158b64e8dbcb Mon Sep 17 00:00:00 2001 From: wei liu Date: Mon, 17 Jun 2024 10:07:57 +0800 Subject: [PATCH 07/10] enhance: Add rbac support on describe/alter database api (#33803) issue: #32707 Signed-off-by: Wei Liu --- pkg/util/constant.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/util/constant.go b/pkg/util/constant.go index a416affa97564..75c58435615c1 100644 --- a/pkg/util/constant.go +++ b/pkg/util/constant.go @@ -138,6 +138,8 @@ var ( MetaStore2API(commonpb.ObjectPrivilege_PrivilegeCreateDatabase.String()), MetaStore2API(commonpb.ObjectPrivilege_PrivilegeDropDatabase.String()), MetaStore2API(commonpb.ObjectPrivilege_PrivilegeListDatabases.String()), + MetaStore2API(commonpb.ObjectPrivilege_PrivilegeAlterDatabase.String()), + MetaStore2API(commonpb.ObjectPrivilege_PrivilegeDescribeDatabase.String()), MetaStore2API(commonpb.ObjectPrivilege_PrivilegeCreateAlias.String()), MetaStore2API(commonpb.ObjectPrivilege_PrivilegeDropAlias.String()), From 95148866ed11bf514243fafec23bd960e31bb907 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Mon, 17 Jun 2024 10:09:57 +0800 Subject: [PATCH 08/10] fix: Don't remove growing L0 segment in datanode metacache (#33829) issue: #33540 1. gorwing L0 segments is invisible to datacoord. 2. flushed L0 segments need to clean by datacoord. Signed-off-by: Cai Zhang --- internal/datacoord/sync_segments_scheduler.go | 2 +- internal/datanode/metacache/meta_cache.go | 3 +- internal/datanode/services.go | 3 +- internal/datanode/services_test.go | 138 +++++++++++++++++- 4 files changed, 136 insertions(+), 10 deletions(-) diff --git a/internal/datacoord/sync_segments_scheduler.go b/internal/datacoord/sync_segments_scheduler.go index 69348c0de7608..f5224f7110b9e 100644 --- a/internal/datacoord/sync_segments_scheduler.go +++ b/internal/datacoord/sync_segments_scheduler.go @@ -115,7 +115,7 @@ func (sss *SyncSegmentsScheduler) SyncSegments(collectionID, partitionID int64, log := log.With(zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.String("channelName", channelName), zap.Int64("nodeID", nodeID)) segments := sss.meta.SelectSegments(WithChannel(channelName), SegmentFilterFunc(func(info *SegmentInfo) bool { - return info.GetPartitionID() == partitionID && isSegmentHealthy(info) && info.GetLevel() != datapb.SegmentLevel_L0 + return info.GetPartitionID() == partitionID && isSegmentHealthy(info) })) req := &datapb.SyncSegmentsRequest{ ChannelName: channelName, diff --git a/internal/datanode/metacache/meta_cache.go b/internal/datanode/metacache/meta_cache.go index 1c85946932943..221cf5a8f2962 100644 --- a/internal/datanode/metacache/meta_cache.go +++ b/internal/datanode/metacache/meta_cache.go @@ -291,7 +291,8 @@ func (c *metaCacheImpl) UpdateSegmentView(partitionID int64, } for segID, info := range c.segmentInfos { - if info.partitionID != partitionID { + if info.partitionID != partitionID || + (info.Level() == datapb.SegmentLevel_L0 && info.State() == commonpb.SegmentState_Growing) { continue } if _, ok := allSegments[segID]; !ok { diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 73db822a52f1c..6608144016d48 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -313,8 +313,9 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments for _, segID := range missingSegments { segID := segID + newSeg := req.GetSegmentInfos()[segID] + newSegments = append(newSegments, newSeg) future := io.GetOrCreateStatsPool().Submit(func() (any, error) { - newSeg := req.GetSegmentInfos()[segID] var val *metacache.BloomFilterSet var err error err = binlog.DecompressBinLog(storage.StatsBinlog, req.GetCollectionId(), req.GetPartitionId(), newSeg.GetSegmentId(), []*datapb.FieldBinlog{newSeg.GetPkStatsLog()}) diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 156b076d8d201..52a2edb8a0444 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -530,7 +530,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { s.ErrorIs(merr.Error(status), merr.ErrServiceNotReady) }) - s.Run("normal case", func() { + s.Run("dataSyncService not exist", func() { s.SetupTest() ctx := context.Background() req := &datapb.SyncSegmentsRequest{ @@ -538,12 +538,15 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { PartitionId: 2, CollectionId: 1, SegmentInfos: map[int64]*datapb.SyncSegmentInfo{ - 3: { - SegmentId: 3, - PkStatsLog: nil, - State: commonpb.SegmentState_Dropped, - Level: 2, - NumOfRows: 1024, + 102: { + SegmentId: 102, + PkStatsLog: &datapb.FieldBinlog{ + FieldID: 100, + Binlogs: nil, + }, + State: commonpb.SegmentState_Flushed, + Level: 2, + NumOfRows: 1024, }, }, } @@ -552,6 +555,127 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { s.NoError(err) s.False(merr.Ok(status)) }) + + s.Run("normal case", func() { + s.SetupTest() + cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{ + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "pk", + IsPrimaryKey: true, + Description: "", + DataType: schemapb.DataType_Int64, + }, + }, + }, + Vchan: &datapb.VchannelInfo{}, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + cache.AddSegment(&datapb.SegmentInfo{ + ID: 100, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "111", + NumOfRows: 0, + State: commonpb.SegmentState_Growing, + Level: datapb.SegmentLevel_L0, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + cache.AddSegment(&datapb.SegmentInfo{ + ID: 101, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "111", + NumOfRows: 0, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + cache.AddSegment(&datapb.SegmentInfo{ + ID: 102, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "111", + NumOfRows: 0, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L0, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + cache.AddSegment(&datapb.SegmentInfo{ + ID: 103, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "111", + NumOfRows: 0, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L0, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + mockFlowgraphManager := NewMockFlowgraphManager(s.T()) + mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).Return(&dataSyncService{ + metacache: cache, + }, true) + s.node.flowgraphManager = mockFlowgraphManager + ctx := context.Background() + req := &datapb.SyncSegmentsRequest{ + ChannelName: "channel1", + PartitionId: 2, + CollectionId: 1, + SegmentInfos: map[int64]*datapb.SyncSegmentInfo{ + 103: { + SegmentId: 103, + PkStatsLog: &datapb.FieldBinlog{ + FieldID: 100, + Binlogs: nil, + }, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L0, + NumOfRows: 1024, + }, + 104: { + SegmentId: 104, + PkStatsLog: &datapb.FieldBinlog{ + FieldID: 100, + Binlogs: nil, + }, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + NumOfRows: 1024, + }, + }, + } + + status, err := s.node.SyncSegments(ctx, req) + s.NoError(err) + s.True(merr.Ok(status)) + + info, exist := cache.GetSegmentByID(100) + s.True(exist) + s.NotNil(info) + + info, exist = cache.GetSegmentByID(101) + s.False(exist) + s.Nil(info) + + info, exist = cache.GetSegmentByID(102) + s.False(exist) + s.Nil(info) + + info, exist = cache.GetSegmentByID(103) + s.True(exist) + s.NotNil(info) + + info, exist = cache.GetSegmentByID(104) + s.True(exist) + s.NotNil(info) + }) } func (s *DataNodeServicesSuite) TestDropCompactionPlan() { From 188ee433b915b9a56df5dc5fdad02858fece7e41 Mon Sep 17 00:00:00 2001 From: wei liu Date: Mon, 17 Jun 2024 10:37:58 +0800 Subject: [PATCH 09/10] enhance: Refine name rule check error msg (#33815) Signed-off-by: Wei Liu --- internal/proxy/util.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/proxy/util.go b/internal/proxy/util.go index 8982a17800aca..c16298c2f4705 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -268,7 +268,7 @@ func validateFieldName(fieldName string) error { for i := 1; i < fieldNameSize; i++ { c := fieldName[i] if c != '_' && !isAlpha(c) && !isNumber(c) { - msg := invalidMsg + "Field name cannot only contain numbers, letters, and underscores." + msg := invalidMsg + "Field name can only contain numbers, letters, and underscores." return merr.WrapErrFieldNameInvalid(fieldName, msg) } } @@ -1073,7 +1073,7 @@ func validateIndexName(indexName string) error { for i := 1; i < indexNameSize; i++ { c := indexName[i] if c != '_' && !isAlpha(c) && !isNumber(c) { - msg := invalidMsg + "Index name cannot only contain numbers, letters, and underscores." + msg := invalidMsg + "Index name can only contain numbers, letters, and underscores." return errors.New(msg) } } From f993b2913bf7f6d367600c7a28824818ff8e44b9 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 17 Jun 2024 12:06:04 +0800 Subject: [PATCH 10/10] enhance: Reserve space of payload writer when serialize data (#33817) See also #33561 #33562 Signed-off-by: Congqi Xia --- internal/storage/data_codec.go | 1 + internal/storage/payload.go | 1 + internal/storage/payload_writer.go | 4 ++++ 3 files changed, 6 insertions(+) diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 59f2b89ad451e..e578c3ec5320e 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -290,6 +290,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique return nil, err } eventWriter.SetEventTimestamp(startTs, endTs) + eventWriter.Reserve(int(rowNum)) var memorySize int64 for _, block := range data { diff --git a/internal/storage/payload.go b/internal/storage/payload.go index 99fa0f52094e3..683b91b4014cc 100644 --- a/internal/storage/payload.go +++ b/internal/storage/payload.go @@ -47,6 +47,7 @@ type PayloadWriterInterface interface { GetPayloadBufferFromWriter() ([]byte, error) GetPayloadLengthFromWriter() (int, error) ReleasePayloadWriter() + Reserve(size int) Close() } diff --git a/internal/storage/payload_writer.go b/internal/storage/payload_writer.go index 819b20b1b4654..b8b3c68f23b42 100644 --- a/internal/storage/payload_writer.go +++ b/internal/storage/payload_writer.go @@ -535,6 +535,10 @@ func (w *NativePayloadWriter) FinishPayloadWriter() error { ) } +func (w *NativePayloadWriter) Reserve(size int) { + w.builder.Reserve(size) +} + func (w *NativePayloadWriter) GetPayloadBufferFromWriter() ([]byte, error) { data := w.output.Bytes()