diff --git a/db/version_set.cc b/db/version_set.cc index c2403edcf244..4233134cb99a 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2035,8 +2035,8 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options, cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level), mutable_cf_options_.prefix_extractor, should_sample_file_read(), cfd_->internal_stats()->GetFileReadHist(level), - TableReaderCaller::kUserIterator, IsFilterSkipped(level, read_options), level, - &range_del_agg)); + TableReaderCaller::kUserIterator, IsFilterSkipped(level, read_options), + level, &range_del_agg)); status = OverlapWithIterator(ucmp, smallest_user_key, largest_user_key, iter.get(), overlap); } @@ -2714,8 +2714,9 @@ Status Version::ProcessBatch( while (f) { MultiGetRange file_range = fp.CurrentFileRange(); TableCache::TypedHandle* table_handle = nullptr; - bool skip_filters = IsFilterSkipped(static_cast(fp.GetHitFileLevel()), - fp.IsHitFileLastInLevel()); + bool skip_filters = + IsFilterSkipped(static_cast(fp.GetHitFileLevel()), read_options, + fp.IsHitFileLastInLevel()); bool skip_range_deletions = false; if (!skip_filters) { Status status = table_cache_->MultiGetFilter( @@ -5647,7 +5648,7 @@ Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, } if (edit->HasReplicationSequence()) { - replication_sequence_ = edit->GetReplicationSequence(); + replication_sequence_ = edit->GetReplicationSequence(); } // The builder can be nullptr only if edit is WAL manipulation, diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index 030cd8d07a21..e7ecead8d515 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -10,7 +10,9 @@ #include "file/random_access_file_reader.h" #include +#include #include +#include #include "file/file_util.h" #include "monitoring/histogram.h" @@ -599,4 +601,105 @@ void RandomAccessFileReader::ReadAsyncCallback(const FSReadRequest& req, RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size()); delete read_async_info; } + +// Callback data for non-direct IO version of MultiReadAsync. +struct MultiReadAsyncCbInfo { + MultiReadAsyncCbInfo( + std::function cb, void* cb_arg, + uint64_t start_time) + : cb_(cb), cb_arg_(cb_arg), start_time_(start_time) {} + + std::function cb_; + void* cb_arg_; + uint64_t start_time_; +#ifndef ROCKSDB_LITE + FileOperationInfo::StartTimePoint fs_start_ts_; +#endif +}; + +IOStatus RandomAccessFileReader::MultiReadAsync( + FSReadRequest* reqs, size_t num_reqs, const IOOptions& opts, + std::function cb, void* cb_arg, + void** io_handles, size_t* num_io_handles, IOHandleDeleter* del_fns, + AlignedBuf* /* aligned_buf */) { + IOStatus s; + uint64_t elapsed = 0; + + if (use_direct_io()) { + // DirectIO support not implemented for MultiReadAsync + abort(); + } + + // Create a callback and populate info. + auto read_async_callback = std::bind( + &RandomAccessFileReader::MultiReadAsyncCallback, this, + std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); + + auto cb_info = new MultiReadAsyncCbInfo(cb, cb_arg, clock_->NowMicros()); +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + cb_info->fs_start_ts_ = FileOperationInfo::StartNow(); + } +#endif + + StopWatch sw(clock_, nullptr /*stats*/, 0 /*hist_type*/, &elapsed, + true /*overwrite*/, true /*delay_enabled*/); + s = file_->MultiReadAsync(reqs, num_reqs, opts, read_async_callback, cb_info, + io_handles, num_io_handles, del_fns, nullptr); + + RecordTick(stats_, READ_ASYNC_MICROS, elapsed); + +// Suppress false positive clang analyzer warnings. +// Memory is not released if file_->ReadAsync returns !s.ok(), because +// ReadAsyncCallback is never called in that case. If ReadAsyncCallback is +// called then ReadAsync should always return IOStatus::OK(). +#ifndef __clang_analyzer__ + if (!s.ok()) { + delete cb_info; + } +#endif // __clang_analyzer__ + + return s; +} + +void RandomAccessFileReader::MultiReadAsyncCallback(const FSReadRequest* reqs, + size_t n_reqs, + void* cb_arg) { + auto cb_info = static_cast(cb_arg); + assert(cb_info); + assert(cb_info->cb_); + + cb_info->cb_(reqs, n_reqs, cb_info->cb_arg_); + + // Update stats and notify listeners. + if (stats_ != nullptr && file_read_hist_ != nullptr) { + // elapsed doesn't take into account delay and overwrite as StopWatch does + // in Read. + uint64_t elapsed = clock_->NowMicros() - cb_info->start_time_; + file_read_hist_->Add(elapsed); + } + + for (size_t idx = 0; idx < n_reqs; idx++) { + auto& req = reqs[idx]; + if (req.status.ok()) { + RecordInHistogram(stats_, ASYNC_READ_BYTES, req.result.size()); + } else if (!req.status.IsAborted()) { + RecordTick(stats_, ASYNC_READ_ERROR_COUNT, 1); + } +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + auto finish_ts = FileOperationInfo::FinishNow(); + NotifyOnFileReadFinish(req.offset, req.result.size(), + cb_info->fs_start_ts_, finish_ts, req.status); + } + if (!req.status.ok()) { + NotifyOnIOError(req.status, FileOperationType::kRead, file_name(), + req.result.size(), req.offset); + } +#endif + RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size()); + } + delete cb_info; +} + } // namespace ROCKSDB_NAMESPACE diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h index ea7cfd234f9a..c5f7f283da98 100644 --- a/file/random_access_file_reader.h +++ b/file/random_access_file_reader.h @@ -9,6 +9,7 @@ #pragma once #include +#include #include #include @@ -212,6 +213,14 @@ class RandomAccessFileReader { void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, AlignedBuf* aligned_buf); + IOStatus MultiReadAsync( + FSReadRequest* reqs, size_t num_reqs, const IOOptions& opts, + std::function cb, void* cb_arg, + void** io_handles, size_t* num_io_handles, IOHandleDeleter* del_fns, + AlignedBuf* aligned_buf); + void ReadAsyncCallback(const FSReadRequest& req, void* cb_arg); + // Callback for non-directIO MultiReadAsync. + void MultiReadAsyncCallback(const FSReadRequest*, size_t, void*); }; } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index 91ad47218e68..ef3b7b6c2cf4 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -918,6 +919,78 @@ class FSRandomAccessFile { return IOStatus::OK(); } + // EXPERIMENTAL + // This API reads the requested data in a set of FSReadRequest asynchronously. + // This is an asynchronous call, i.e it should return after submitting the + // request. + // + // When the read request is completed, callback function specified in cb + // should be called with arguments cb_arg and the result populated in + // FSReadRequest with result and status fileds updated by FileSystem. + // cb_arg should be used by the callback to track the original request + // submitted. + // + // This API should also populate io_handles which should be used by + // underlying FileSystem to store the context in order to distinguish the read + // requests at their side and provide the custom deletion functions in + // del_fns. RocksDB guarantees that the del_fn for io_handle will be called + // after receiving the callback. Furthermore, RocksDB guarantees that if it + // calls the Poll API for this io_handle, del_fn will be called after the Poll + // returns. RocksDB is responsible for managing the lifetime of io_handles. + // + // The caller preallocates io_handles and del_fns arrays to be be the same + // size as the number of requests (num_reqs). num_io_handles parameter is + // used to pass out the information about how many io_handles (and + // corresponding del_funs) were populated during the call. num_io_handles + // must be pre-initiailized to the maximum size of io_handles/del_funs arrays + // (num_reqs) on the function call. + // + // reqs contains the request offset and size passed as input parameter of read + // request and result and status fields are output parameter set by underlying + // FileSystem. The data should always be read into scratch field. + // + // Default implementation delegates to ReadAsync for each request. + virtual IOStatus MultiReadAsync( + FSReadRequest* reqs, size_t num_reqs, const IOOptions& opts, + std::function cb, void* cb_arg, + void** io_handles, size_t* num_io_handles, IOHandleDeleter* del_fns, + IODebugContext* dbg) { + assert(*num_io_handles == num_reqs); + *num_io_handles = 0; + // Counter that we use keep track of how many individual async reads are + // still in progress. + std::atomic in_flight = 0; + for (size_t idx = 0; idx < num_reqs; idx++) { + auto& req = reqs[idx]; + auto local_cb = [&in_flight](const FSReadRequest&, void*) { + // We are done with this read, decrement the counter and signal. + in_flight--; + in_flight.notify_one(); + }; + auto status = ReadAsync(req, opts, local_cb, nullptr, &io_handles[idx], + &del_fns[idx], dbg); + if (status != IOStatus::OK()) { + // Delete in-progress IO-handles. + for (size_t k = 0; k < *num_io_handles; k++) { + if (io_handles[k] && del_fns[k]) { + del_fns[k](io_handles[k]); + io_handles[k] = nullptr; + } + } + *num_io_handles = 0; + return status; + } + (*num_io_handles)++; + in_flight++; + } + for (auto cv = in_flight.load(); cv != 0; + in_flight.wait(cv), cv = in_flight.load()) { + } + // the operation is completed, call the 'op_cb' + cb(reqs, num_reqs, cb_arg); + return IOStatus::OK(); + } + // EXPERIMENTAL // When available, returns the actual temperature for the file. This is // useful in case some outside process moves a file from one tier to another, @@ -1616,6 +1689,14 @@ class FSRandomAccessFileWrapper : public FSRandomAccessFile { IODebugContext* dbg) override { return target()->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, dbg); } + IOStatus MultiReadAsync( + FSReadRequest* reqs, size_t num_reqs, const IOOptions& opts, + std::function cb, void* cb_arg, + void** io_handles, size_t* num_io_handles, IOHandleDeleter* del_fns, + IODebugContext* dbg) override { + return target()->MultiReadAsync(reqs, num_reqs, opts, cb, cb_arg, + io_handles, num_io_handles, del_fns, dbg); + } Temperature GetTemperature() const override { return target_->GetTemperature(); } diff --git a/util/async_file_reader.cc b/util/async_file_reader.cc index 080c1ae96689..2cf34faa6a7f 100644 --- a/util/async_file_reader.cc +++ b/util/async_file_reader.cc @@ -19,22 +19,32 @@ bool AsyncFileReader::MultiReadAsyncImpl(ReadAwaiter* awaiter) { num_reqs_ += awaiter->num_reqs_; awaiter->io_handle_.resize(awaiter->num_reqs_); awaiter->del_fn_.resize(awaiter->num_reqs_); - for (size_t i = 0; i < awaiter->num_reqs_; ++i) { - IOStatus s = awaiter->file_->ReadAsync( - awaiter->read_reqs_[i], awaiter->opts_, - [](const FSReadRequest& req, void* cb_arg) { - FSReadRequest* read_req = static_cast(cb_arg); - read_req->status = req.status; - read_req->result = req.result; - }, - &awaiter->read_reqs_[i], &awaiter->io_handle_[i], &awaiter->del_fn_[i], - /*aligned_buf=*/nullptr); - if (!s.ok()) { - // For any non-ok status, the FileSystem will not call the callback - // So let's update the status ourselves + size_t num_io_handles = awaiter->num_reqs_; + IOStatus s = awaiter->file_->MultiReadAsync( + awaiter->read_reqs_, awaiter->num_reqs_, awaiter->opts_, + [](const FSReadRequest* reqs, size_t n_reqs, void* cb_arg) { + FSReadRequest* read_reqs = static_cast(cb_arg); + if (read_reqs != reqs) { + for (size_t idx = 0; idx < n_reqs; idx++) { + read_reqs[idx].status = reqs[idx].status; + read_reqs[idx].result = reqs[idx].result; + } + } + }, + (void**)&awaiter->read_reqs_, (void**)&awaiter->io_handle_, + &num_io_handles, &awaiter->del_fn_[0], + /*aligned_buf=*/nullptr); + if (!s.ok()) { + assert(num_io_handles == 0); + // For any non-ok status, the FileSystem will not call the callback + // So let's update the status ourselves assuming the whole batch failed. + for (size_t i = 0; i < awaiter->num_reqs_; ++i) { awaiter->read_reqs_[i].status = s; } } + assert(num_io_handles <= awaiter->num_reqs_); + awaiter->io_handle_.resize(num_io_handles); + awaiter->del_fn_.resize(num_io_handles); return true; } @@ -42,33 +52,43 @@ void AsyncFileReader::Wait() { if (!head_) { return; } - ReadAwaiter* waiter; + + // TODO: No need to copy if we have 1 awaiter. + // Poll API seems to encourage inefficiency. std::vector io_handles; - IOStatus s; io_handles.reserve(num_reqs_); - waiter = head_; + + ReadAwaiter* waiter; do { - for (size_t i = 0; i < waiter->num_reqs_; ++i) { + waiter = head_; + for (size_t i = 0; i < waiter->io_handle_.size(); ++i) { if (waiter->io_handle_[i]) { io_handles.push_back(waiter->io_handle_[i]); } } } while (waiter != tail_ && (waiter = waiter->next_)); + + IOStatus s = IOStatus::OK(); if (io_handles.size() > 0) { StopWatch sw(SystemClock::Default().get(), stats_, POLL_WAIT_MICROS); s = fs_->Poll(io_handles, io_handles.size()); } + do { waiter = head_; head_ = waiter->next_; - for (size_t i = 0; i < waiter->num_reqs_; ++i) { + for (size_t i = 0; i < waiter->io_handle_.size(); ++i) { if (waiter->io_handle_[i] && waiter->del_fn_[i]) { waiter->del_fn_[i](waiter->io_handle_[i]); } - if (waiter->read_reqs_[i].status.ok() && !s.ok()) { - // Override the request status with the Poll error - waiter->read_reqs_[i].status = s; + } + if (!s.ok()) { + for (size_t i = 0; i < waiter->num_reqs_; ++i) { + if (waiter->read_reqs_[i].status.ok()) { + // Override the request status with the Poll error + waiter->read_reqs_[i].status = s; + } } } waiter->awaiting_coro_.resume();