Skip to content

Commit

Permalink
Switch to sync default implementation of AsyncMultiRead.
Browse files Browse the repository at this point in the history
  • Loading branch information
dpetrov4 committed Dec 21, 2023
1 parent 0ccbb7e commit 6700da7
Showing 1 changed file with 17 additions and 33 deletions.
50 changes: 17 additions & 33 deletions include/rocksdb/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -673,8 +673,15 @@ class FileSystem : public Customizable {
//
// Default implementation is to return IOStatus::OK.

virtual IOStatus Poll(std::vector<void*>& /*io_handles*/,
virtual IOStatus Poll(std::vector<void*>& io_handles,
size_t /*min_completions*/) {
// RocksDB-Cloud contribution begin
if (!io_handles.empty()) {
// Default implementations on async operations do not return any
// io_handles. If you override them, you need to override Poll as well.
return IOStatus::NotSupported("Poll");
}
// RocksDB-Cloud contribution end
return IOStatus::OK();
}

Expand Down Expand Up @@ -950,44 +957,21 @@ class FSRandomAccessFile {
// request and result and status fields are output parameter set by underlying
// FileSystem. The data should always be read into scratch field.
//
// Default implementation delegates to ReadAsync for each request.
// Default implementation is syncrhonous and delegates to MultiRead.
virtual IOStatus MultiReadAsync(
FSReadRequest* reqs, size_t num_reqs, const IOOptions& opts,
std::function<void(const FSReadRequest*, size_t, void*)> cb, void* cb_arg,
void** io_handles, size_t* num_io_handles, IOHandleDeleter* del_fns,
IODebugContext* dbg) {
void** /*io_handles*/, size_t* num_io_handles,
IOHandleDeleter* /*del_fns*/, IODebugContext* dbg) {
assert(*num_io_handles == num_reqs);
*num_io_handles = 0;
// Counter that we use keep track of how many individual async reads are
// still in progress.
std::atomic<size_t> in_flight = 0;
for (size_t idx = 0; idx < num_reqs; idx++) {
auto& req = reqs[idx];
auto local_cb = [&in_flight](const FSReadRequest&, void*) {
// We are done with this read, decrement the counter and signal.
in_flight--;
in_flight.notify_one();
};
auto status = ReadAsync(req, opts, local_cb, nullptr, &io_handles[idx],
&del_fns[idx], dbg);
if (status != IOStatus::OK()) {
// Delete in-progress IO-handles.
for (size_t k = 0; k < *num_io_handles; k++) {
if (io_handles[k] && del_fns[k]) {
del_fns[k](io_handles[k]);
io_handles[k] = nullptr;
}
}
*num_io_handles = 0;
return status;
}
(*num_io_handles)++;
in_flight++;
}
for (auto cv = in_flight.load(); cv != 0;
in_flight.wait(cv), cv = in_flight.load()) {

auto status = MultiRead(reqs, num_reqs, opts, dbg);
if (!status.ok()) {
return status;
}
// the operation is completed, call the 'op_cb'

// the operation has completed successfully, execute callbacks
cb(reqs, num_reqs, cb_arg);
return IOStatus::OK();
}
Expand Down

0 comments on commit 6700da7

Please sign in to comment.