From 3e0c7dec36a3e631ccdd5791dc38f01b9fca77e7 Mon Sep 17 00:00:00 2001 From: Dmitri Petrov Date: Fri, 13 Oct 2023 17:35:10 +0000 Subject: [PATCH] [SYS-6179] Add MultiReadAsync file API. Extending the file API to provide an option to implement asynchronous multi read operation. The default implementation delegates to AsyncRead. Updated the AsyncFileReader class implementation to use MultiReadAsync. --- db/version_set.cc | 6 +- file/random_access_file_reader.cc | 103 ++++++++++++++++++++++++++++++ file/random_access_file_reader.h | 9 +++ include/rocksdb/file_system.h | 81 +++++++++++++++++++++++ util/async_file_reader.cc | 62 ++++++++++++------ 5 files changed, 237 insertions(+), 24 deletions(-) diff --git a/db/version_set.cc b/db/version_set.cc index 30911871831a..5d2c84f2b319 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2035,8 +2035,8 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options, cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level), mutable_cf_options_.prefix_extractor, should_sample_file_read(), cfd_->internal_stats()->GetFileReadHist(level), - TableReaderCaller::kUserIterator, IsFilterSkipped(level, read_options), level, - &range_del_agg)); + TableReaderCaller::kUserIterator, IsFilterSkipped(level, read_options), + level, &range_del_agg)); status = OverlapWithIterator(ucmp, smallest_user_key, largest_user_key, iter.get(), overlap); } @@ -5648,7 +5648,7 @@ Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, } if (edit->HasReplicationSequence()) { - replication_sequence_ = edit->GetReplicationSequence(); + replication_sequence_ = edit->GetReplicationSequence(); } // The builder can be nullptr only if edit is WAL manipulation, diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index 030cd8d07a21..e7ecead8d515 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -10,7 +10,9 @@ #include "file/random_access_file_reader.h" #include +#include #include +#include #include "file/file_util.h" #include "monitoring/histogram.h" @@ -599,4 +601,105 @@ void RandomAccessFileReader::ReadAsyncCallback(const FSReadRequest& req, RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size()); delete read_async_info; } + +// Callback data for non-direct IO version of MultiReadAsync. +struct MultiReadAsyncCbInfo { + MultiReadAsyncCbInfo( + std::function cb, void* cb_arg, + uint64_t start_time) + : cb_(cb), cb_arg_(cb_arg), start_time_(start_time) {} + + std::function cb_; + void* cb_arg_; + uint64_t start_time_; +#ifndef ROCKSDB_LITE + FileOperationInfo::StartTimePoint fs_start_ts_; +#endif +}; + +IOStatus RandomAccessFileReader::MultiReadAsync( + FSReadRequest* reqs, size_t num_reqs, const IOOptions& opts, + std::function cb, void* cb_arg, + void** io_handles, size_t* num_io_handles, IOHandleDeleter* del_fns, + AlignedBuf* /* aligned_buf */) { + IOStatus s; + uint64_t elapsed = 0; + + if (use_direct_io()) { + // DirectIO support not implemented for MultiReadAsync + abort(); + } + + // Create a callback and populate info. + auto read_async_callback = std::bind( + &RandomAccessFileReader::MultiReadAsyncCallback, this, + std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); + + auto cb_info = new MultiReadAsyncCbInfo(cb, cb_arg, clock_->NowMicros()); +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + cb_info->fs_start_ts_ = FileOperationInfo::StartNow(); + } +#endif + + StopWatch sw(clock_, nullptr /*stats*/, 0 /*hist_type*/, &elapsed, + true /*overwrite*/, true /*delay_enabled*/); + s = file_->MultiReadAsync(reqs, num_reqs, opts, read_async_callback, cb_info, + io_handles, num_io_handles, del_fns, nullptr); + + RecordTick(stats_, READ_ASYNC_MICROS, elapsed); + +// Suppress false positive clang analyzer warnings. +// Memory is not released if file_->ReadAsync returns !s.ok(), because +// ReadAsyncCallback is never called in that case. If ReadAsyncCallback is +// called then ReadAsync should always return IOStatus::OK(). +#ifndef __clang_analyzer__ + if (!s.ok()) { + delete cb_info; + } +#endif // __clang_analyzer__ + + return s; +} + +void RandomAccessFileReader::MultiReadAsyncCallback(const FSReadRequest* reqs, + size_t n_reqs, + void* cb_arg) { + auto cb_info = static_cast(cb_arg); + assert(cb_info); + assert(cb_info->cb_); + + cb_info->cb_(reqs, n_reqs, cb_info->cb_arg_); + + // Update stats and notify listeners. + if (stats_ != nullptr && file_read_hist_ != nullptr) { + // elapsed doesn't take into account delay and overwrite as StopWatch does + // in Read. + uint64_t elapsed = clock_->NowMicros() - cb_info->start_time_; + file_read_hist_->Add(elapsed); + } + + for (size_t idx = 0; idx < n_reqs; idx++) { + auto& req = reqs[idx]; + if (req.status.ok()) { + RecordInHistogram(stats_, ASYNC_READ_BYTES, req.result.size()); + } else if (!req.status.IsAborted()) { + RecordTick(stats_, ASYNC_READ_ERROR_COUNT, 1); + } +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + auto finish_ts = FileOperationInfo::FinishNow(); + NotifyOnFileReadFinish(req.offset, req.result.size(), + cb_info->fs_start_ts_, finish_ts, req.status); + } + if (!req.status.ok()) { + NotifyOnIOError(req.status, FileOperationType::kRead, file_name(), + req.result.size(), req.offset); + } +#endif + RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size()); + } + delete cb_info; +} + } // namespace ROCKSDB_NAMESPACE diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h index ea7cfd234f9a..c5f7f283da98 100644 --- a/file/random_access_file_reader.h +++ b/file/random_access_file_reader.h @@ -9,6 +9,7 @@ #pragma once #include +#include #include #include @@ -212,6 +213,14 @@ class RandomAccessFileReader { void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, AlignedBuf* aligned_buf); + IOStatus MultiReadAsync( + FSReadRequest* reqs, size_t num_reqs, const IOOptions& opts, + std::function cb, void* cb_arg, + void** io_handles, size_t* num_io_handles, IOHandleDeleter* del_fns, + AlignedBuf* aligned_buf); + void ReadAsyncCallback(const FSReadRequest& req, void* cb_arg); + // Callback for non-directIO MultiReadAsync. + void MultiReadAsyncCallback(const FSReadRequest*, size_t, void*); }; } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index 91ad47218e68..ef3b7b6c2cf4 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -918,6 +919,78 @@ class FSRandomAccessFile { return IOStatus::OK(); } + // EXPERIMENTAL + // This API reads the requested data in a set of FSReadRequest asynchronously. + // This is an asynchronous call, i.e it should return after submitting the + // request. + // + // When the read request is completed, callback function specified in cb + // should be called with arguments cb_arg and the result populated in + // FSReadRequest with result and status fileds updated by FileSystem. + // cb_arg should be used by the callback to track the original request + // submitted. + // + // This API should also populate io_handles which should be used by + // underlying FileSystem to store the context in order to distinguish the read + // requests at their side and provide the custom deletion functions in + // del_fns. RocksDB guarantees that the del_fn for io_handle will be called + // after receiving the callback. Furthermore, RocksDB guarantees that if it + // calls the Poll API for this io_handle, del_fn will be called after the Poll + // returns. RocksDB is responsible for managing the lifetime of io_handles. + // + // The caller preallocates io_handles and del_fns arrays to be be the same + // size as the number of requests (num_reqs). num_io_handles parameter is + // used to pass out the information about how many io_handles (and + // corresponding del_funs) were populated during the call. num_io_handles + // must be pre-initiailized to the maximum size of io_handles/del_funs arrays + // (num_reqs) on the function call. + // + // reqs contains the request offset and size passed as input parameter of read + // request and result and status fields are output parameter set by underlying + // FileSystem. The data should always be read into scratch field. + // + // Default implementation delegates to ReadAsync for each request. + virtual IOStatus MultiReadAsync( + FSReadRequest* reqs, size_t num_reqs, const IOOptions& opts, + std::function cb, void* cb_arg, + void** io_handles, size_t* num_io_handles, IOHandleDeleter* del_fns, + IODebugContext* dbg) { + assert(*num_io_handles == num_reqs); + *num_io_handles = 0; + // Counter that we use keep track of how many individual async reads are + // still in progress. + std::atomic in_flight = 0; + for (size_t idx = 0; idx < num_reqs; idx++) { + auto& req = reqs[idx]; + auto local_cb = [&in_flight](const FSReadRequest&, void*) { + // We are done with this read, decrement the counter and signal. + in_flight--; + in_flight.notify_one(); + }; + auto status = ReadAsync(req, opts, local_cb, nullptr, &io_handles[idx], + &del_fns[idx], dbg); + if (status != IOStatus::OK()) { + // Delete in-progress IO-handles. + for (size_t k = 0; k < *num_io_handles; k++) { + if (io_handles[k] && del_fns[k]) { + del_fns[k](io_handles[k]); + io_handles[k] = nullptr; + } + } + *num_io_handles = 0; + return status; + } + (*num_io_handles)++; + in_flight++; + } + for (auto cv = in_flight.load(); cv != 0; + in_flight.wait(cv), cv = in_flight.load()) { + } + // the operation is completed, call the 'op_cb' + cb(reqs, num_reqs, cb_arg); + return IOStatus::OK(); + } + // EXPERIMENTAL // When available, returns the actual temperature for the file. This is // useful in case some outside process moves a file from one tier to another, @@ -1616,6 +1689,14 @@ class FSRandomAccessFileWrapper : public FSRandomAccessFile { IODebugContext* dbg) override { return target()->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, dbg); } + IOStatus MultiReadAsync( + FSReadRequest* reqs, size_t num_reqs, const IOOptions& opts, + std::function cb, void* cb_arg, + void** io_handles, size_t* num_io_handles, IOHandleDeleter* del_fns, + IODebugContext* dbg) override { + return target()->MultiReadAsync(reqs, num_reqs, opts, cb, cb_arg, + io_handles, num_io_handles, del_fns, dbg); + } Temperature GetTemperature() const override { return target_->GetTemperature(); } diff --git a/util/async_file_reader.cc b/util/async_file_reader.cc index 080c1ae96689..2cf34faa6a7f 100644 --- a/util/async_file_reader.cc +++ b/util/async_file_reader.cc @@ -19,22 +19,32 @@ bool AsyncFileReader::MultiReadAsyncImpl(ReadAwaiter* awaiter) { num_reqs_ += awaiter->num_reqs_; awaiter->io_handle_.resize(awaiter->num_reqs_); awaiter->del_fn_.resize(awaiter->num_reqs_); - for (size_t i = 0; i < awaiter->num_reqs_; ++i) { - IOStatus s = awaiter->file_->ReadAsync( - awaiter->read_reqs_[i], awaiter->opts_, - [](const FSReadRequest& req, void* cb_arg) { - FSReadRequest* read_req = static_cast(cb_arg); - read_req->status = req.status; - read_req->result = req.result; - }, - &awaiter->read_reqs_[i], &awaiter->io_handle_[i], &awaiter->del_fn_[i], - /*aligned_buf=*/nullptr); - if (!s.ok()) { - // For any non-ok status, the FileSystem will not call the callback - // So let's update the status ourselves + size_t num_io_handles = awaiter->num_reqs_; + IOStatus s = awaiter->file_->MultiReadAsync( + awaiter->read_reqs_, awaiter->num_reqs_, awaiter->opts_, + [](const FSReadRequest* reqs, size_t n_reqs, void* cb_arg) { + FSReadRequest* read_reqs = static_cast(cb_arg); + if (read_reqs != reqs) { + for (size_t idx = 0; idx < n_reqs; idx++) { + read_reqs[idx].status = reqs[idx].status; + read_reqs[idx].result = reqs[idx].result; + } + } + }, + (void**)&awaiter->read_reqs_, (void**)&awaiter->io_handle_, + &num_io_handles, &awaiter->del_fn_[0], + /*aligned_buf=*/nullptr); + if (!s.ok()) { + assert(num_io_handles == 0); + // For any non-ok status, the FileSystem will not call the callback + // So let's update the status ourselves assuming the whole batch failed. + for (size_t i = 0; i < awaiter->num_reqs_; ++i) { awaiter->read_reqs_[i].status = s; } } + assert(num_io_handles <= awaiter->num_reqs_); + awaiter->io_handle_.resize(num_io_handles); + awaiter->del_fn_.resize(num_io_handles); return true; } @@ -42,33 +52,43 @@ void AsyncFileReader::Wait() { if (!head_) { return; } - ReadAwaiter* waiter; + + // TODO: No need to copy if we have 1 awaiter. + // Poll API seems to encourage inefficiency. std::vector io_handles; - IOStatus s; io_handles.reserve(num_reqs_); - waiter = head_; + + ReadAwaiter* waiter; do { - for (size_t i = 0; i < waiter->num_reqs_; ++i) { + waiter = head_; + for (size_t i = 0; i < waiter->io_handle_.size(); ++i) { if (waiter->io_handle_[i]) { io_handles.push_back(waiter->io_handle_[i]); } } } while (waiter != tail_ && (waiter = waiter->next_)); + + IOStatus s = IOStatus::OK(); if (io_handles.size() > 0) { StopWatch sw(SystemClock::Default().get(), stats_, POLL_WAIT_MICROS); s = fs_->Poll(io_handles, io_handles.size()); } + do { waiter = head_; head_ = waiter->next_; - for (size_t i = 0; i < waiter->num_reqs_; ++i) { + for (size_t i = 0; i < waiter->io_handle_.size(); ++i) { if (waiter->io_handle_[i] && waiter->del_fn_[i]) { waiter->del_fn_[i](waiter->io_handle_[i]); } - if (waiter->read_reqs_[i].status.ok() && !s.ok()) { - // Override the request status with the Poll error - waiter->read_reqs_[i].status = s; + } + if (!s.ok()) { + for (size_t i = 0; i < waiter->num_reqs_; ++i) { + if (waiter->read_reqs_[i].status.ok()) { + // Override the request status with the Poll error + waiter->read_reqs_[i].status = s; + } } } waiter->awaiting_coro_.resume();