Skip to content

Commit

Permalink
Fix IndexReader (#2198)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Fix possible multi-thread race condition in `GetColumnIndexReader()`
Fix python test script

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] Refactoring
  • Loading branch information
yangzq50 authored Nov 7, 2024
1 parent ce69036 commit fd65303
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 8 deletions.
14 changes: 7 additions & 7 deletions python/parallel_test/test_index_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,18 +272,18 @@ def index_worker(connection_pool: ConnectionPool, table_name, column_name, index
ConflictType.Ignore)
if res.error_code == ErrorCode.OK:
# print(f"thread {thread_id}: index {index_name} created")
logging.info(f"thread {thread_id}: index {index_name} created")
self.logger.info(f"thread {thread_id}: index {index_name} created")
else:
# print(f"thread {thread_id}: create_index {index_name} failed: {res.error_msg}")
logging.info(f"thread {thread_id}: create_index {index_name} failed: {res.error_msg}")
self.logger.info(f"thread {thread_id}: create_index {index_name} failed: {res.error_msg}")
time.sleep(0.5)
res = table_obj.drop_index(index_name, ConflictType.Ignore)
if res.error_code == ErrorCode.OK:
# print(f"thread {thread_id}: index {index_name} deleted")
logging.info(f"thread {thread_id}: index {index_name} deleted")
self.logger.info(f"thread {thread_id}: index {index_name} deleted")
else:
# print(f"thread {thread_id}: delete_index {index_name} failed: {res.error_msg}")
logging.info(f"thread {thread_id}: delete_index {index_name} failed: {res.error_msg}")
self.logger.info(f"thread {thread_id}: delete_index {index_name} failed: {res.error_msg}")
time.sleep(0.5)

connection_pool.release_conn(infinity_obj)
Expand All @@ -300,7 +300,7 @@ def insert_worker(connection_pool: ConnectionPool, table_name, data, end_time, t
"docdate": data["docdate"][i], "body": data["body"][i]})
table_obj.insert(value)
# print(f"thread {thread_id}: put data")
logging.info(f"thread {thread_id}: put data")
self.logger.info(f"thread {thread_id}: put data")
time.sleep(1)

connection_pool.release_conn(infinity_obj)
Expand All @@ -315,10 +315,10 @@ def query_worker(connection_pool: ConnectionPool, table_name, end_time, thread_i
res = table_obj.output(["doctitle", "docdate", "_row_id", "_score"]).match_text(
"body^5", "harmful chemical", 3).to_pl()
# print(f"thread {thread_id}: check result:\n{res}")
logging.info(f"thread {thread_id}: check result:\n{res}")
self.logger.info(f"thread {thread_id}: check result:\n{res}")
except Exception as e:
# print(f"thread {thread_id}: check failed: {e}")
logging.info(f"thread {thread_id}: check failed: {e}")
self.logger.info(f"thread {thread_id}: check failed: {e}")
time.sleep(0.5)

connection_pool.release_conn(infinity_obj)
Expand Down
8 changes: 7 additions & 1 deletion src/storage/invertedindex/column_index_reader.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,13 @@ struct Hash {
} // namespace detail

export struct IndexReader {
ColumnIndexReader *GetColumnIndexReader(u64 column_id) const { return (*column_index_readers_)[column_id].get(); }
ColumnIndexReader *GetColumnIndexReader(const u64 column_id) const {
if (const auto it = column_index_readers_->find(column_id); it != column_index_readers_->end()) {
return it->second.get();
}
return nullptr;
}

const Map<String, String> &GetColumn2Analyzer() const { return *column2analyzer_; }

SharedPtr<FlatHashMap<u64, SharedPtr<ColumnIndexReader>, detail::Hash<u64>>> column_index_readers_;
Expand Down

0 comments on commit fd65303

Please sign in to comment.