diff --git a/include/seastar/core/iostream-impl.hh b/include/seastar/core/iostream-impl.hh index 5112a918e76..535c699fe30 100644 --- a/include/seastar/core/iostream-impl.hh +++ b/include/seastar/core/iostream-impl.hh @@ -192,6 +192,64 @@ input_stream::read_exactly(size_t n) { } } +template +future> +input_stream::read_exactly_part_direct(size_t n, tmp_buf out, size_t completed) { + assert(!available() && "Still available buffer left"); + if (completed == n) { + return make_ready_future(std::move(out)); + } + + return _fd.get_direct(out.get_write() + completed, n - completed) + .then([this, n, out = std::move(out), completed] (size_t size, tmp_buf prefetch_buf) mutable { + if (size == 0) { + assert(prefetch_buf.empty() && "EOF should not prefetch"); + _eof = true; + return make_ready_future(); + } else { + auto read_offset = completed + size; + if (bool(prefetch_buf)) { + // with prefetch + assert(read_offset == n && "Prefetch should only happen with a complete read"); + _buf = std::move(prefetch_buf); + return make_ready_future(std::move(out)); + } else { + // no prefetch, and buffer maybe too small + return this->read_exactly_part_direct(n, std::move(out), read_offset); + } + } + }); +} + +template +future> +input_stream::read_exactly2(size_t n, uint16_t alignment) { + if (reinterpret_cast(_buf.begin()) % (1ul << (alignment - 1)) == 0) { + // alignment suffices + if (_buf.size() == n) { + // easy case: steal buffer, return to caller + return make_ready_future(std::move(_buf)); + } else if (_buf.size() > n) { + // buffer large enough, share it with caller + auto front = _buf.share(0, n); + _buf.trim_front(n); + return make_ready_future(std::move(front)); + } + // buffer not large enough, we need to create one + } + // no chance to reuse the memory space of the prefetched buffer + auto out = tmp_buf::aligned(std::max(alignment, DEFAULT_ALIGNMENT), n); + auto len_needs_copy = std::min(available(), n); + std::copy(_buf.get(), _buf.get() + len_needs_copy, out.get_write()); + _buf.trim_front(len_needs_copy); + if (len_needs_copy == n) { + // ok, prefetched + return make_ready_future(std::move(out)); + } else { + return read_exactly_part_direct(n, std::move(out), len_needs_copy); + } +} + template template GCC6_CONCEPT(requires InputStreamConsumer || ObsoleteInputStreamConsumer) diff --git a/include/seastar/core/iostream.hh b/include/seastar/core/iostream.hh index cd7cfbe05d9..9f537433237 100644 --- a/include/seastar/core/iostream.hh +++ b/include/seastar/core/iostream.hh @@ -48,6 +48,18 @@ class data_source_impl { public: virtual ~data_source_impl() {} virtual future> get() = 0; + virtual future> get_direct(char* buf, size_t size) { + // the default implementation: + // need to override if the concrete data_source_impl + // can work with user provided buffer pointer with less copy. + return get().then([buf, size] (auto read_buf) mutable { + auto len_needs_copy = std::min(read_buf.size(), size); + std::copy(read_buf.get_write(), read_buf.get_write() + len_needs_copy, buf); + read_buf.trim_front(len_needs_copy); + return make_ready_future>( + len_needs_copy, std::move(read_buf)); + }); + } virtual future> skip(uint64_t n); virtual future<> close() { return make_ready_future<>(); } }; @@ -62,6 +74,10 @@ public: data_source(data_source&& x) = default; data_source& operator=(data_source&& x) = default; future> get() { return _dsi->get(); } + // fill the buf directly within the size boundary. + // return the number of bytes actually read, and if the direct buf is fulfilled, + // also return the prefetched buffer. + future> get_direct(char* buf, size_t size) { return _dsi->get_direct(buf, size); } future> skip(uint64_t n) { return _dsi->skip(n); } future<> close() { return _dsi->close(); } }; @@ -214,6 +230,8 @@ public: input_stream(input_stream&&) = default; input_stream& operator=(input_stream&&) = default; future> read_exactly(size_t n); + static constexpr uint16_t DEFAULT_ALIGNMENT = alignof(void*); + future> read_exactly2(size_t n, uint16_t alignment = DEFAULT_ALIGNMENT); template GCC6_CONCEPT(requires InputStreamConsumer || ObsoleteInputStreamConsumer) future<> consume(Consumer&& c); @@ -256,6 +274,7 @@ public: data_source detach() &&; private: future> read_exactly_part(size_t n, tmp_buf buf, size_t completed); + future> read_exactly_part_direct(size_t n, tmp_buf buf, size_t completed); }; // Facilitates data buffering before it's handed over to data_sink. diff --git a/include/seastar/net/posix-stack.hh b/include/seastar/net/posix-stack.hh index 803f6db6266..ad97f3912c6 100644 --- a/include/seastar/net/posix-stack.hh +++ b/include/seastar/net/posix-stack.hh @@ -112,6 +112,7 @@ public: size_t buf_size = 8192) : _buffer_allocator(allocator), _fd(std::move(fd)), _buf(make_temporary_buffer(_buffer_allocator, buf_size)), _buf_size(buf_size) {} future> get() override; + future> get_direct(char* buf, size_t size) override; future<> close() override; }; diff --git a/src/net/posix-stack.cc b/src/net/posix-stack.cc index 758deb0b32b..9c3d397c1c0 100644 --- a/src/net/posix-stack.cc +++ b/src/net/posix-stack.cc @@ -325,6 +325,21 @@ posix_data_source_impl::get() { }); } +future> +posix_data_source_impl::get_direct(char* buf, size_t size) { + if (size > _buf_size / 2) { + // this was a large read, we don't prefetch + return _fd->read_some(buf, size).then([] (auto read_size) { + return make_ready_future>( + read_size, temporary_buffer()); + }); + } else { + // read with prefetch, but with extra memory copy, + // because we prefer less system calls. + return data_source_impl::get_direct(buf, size); + } +} + future<> posix_data_source_impl::close() { _fd->shutdown(SHUT_RD); return make_ready_future<>();