Skip to content

Commit

Permalink
iostream: extended read_exactly2 interface with alignment
Browse files Browse the repository at this point in the history
For posix-stack: minimize system-calls with prefetch, and minimize
unecessary memory copies.

For native-stack: minimize unecessary memory copies.

TODO: compatible but may not be optimal
- tls_connected_socket_impl
- file_data_source_impl
- loopback_data_source_impl
- packet_data_source

Signed-off-by: Yingxin <[email protected]>
  • Loading branch information
cyx1231st committed Jun 10, 2019
1 parent 0f1c501 commit 559d64a
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 0 deletions.
58 changes: 58 additions & 0 deletions include/seastar/core/iostream-impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,64 @@ input_stream<CharType>::read_exactly(size_t n) {
}
}

template <typename CharType>
future<temporary_buffer<CharType>>
input_stream<CharType>::read_exactly_part_direct(size_t n, tmp_buf out, size_t completed) {
assert(!available() && "Still available buffer left");
if (completed == n) {
return make_ready_future<tmp_buf>(std::move(out));
}

return _fd.get_direct(out.get_write() + completed, n - completed)
.then([this, n, out = std::move(out), completed] (size_t size, tmp_buf prefetch_buf) mutable {
if (size == 0) {
assert(prefetch_buf.empty() && "EOF should not prefetch");
_eof = true;
return make_ready_future<tmp_buf>();
} else {
auto read_offset = completed + size;
if (bool(prefetch_buf)) {
// with prefetch
assert(read_offset == n && "Prefetch should only happen with a complete read");
_buf = std::move(prefetch_buf);
return make_ready_future<tmp_buf>(std::move(out));
} else {
// no prefetch, and buffer maybe too small
return this->read_exactly_part_direct(n, std::move(out), read_offset);
}
}
});
}

template <typename CharType>
future<temporary_buffer<CharType>>
input_stream<CharType>::read_exactly2(size_t n, uint16_t alignment) {
if (reinterpret_cast<uint64_t>(_buf.begin()) % (1ul << (alignment - 1)) == 0) {
// alignment suffices
if (_buf.size() == n) {
// easy case: steal buffer, return to caller
return make_ready_future<tmp_buf>(std::move(_buf));
} else if (_buf.size() > n) {
// buffer large enough, share it with caller
auto front = _buf.share(0, n);
_buf.trim_front(n);
return make_ready_future<tmp_buf>(std::move(front));
}
// buffer not large enough, we need to create one
}
// no chance to reuse the memory space of the prefetched buffer
auto out = tmp_buf::aligned(std::max(alignment, DEFAULT_ALIGNMENT), n);
auto len_needs_copy = std::min(available(), n);
std::copy(_buf.get(), _buf.get() + len_needs_copy, out.get_write());
_buf.trim_front(len_needs_copy);
if (len_needs_copy == n) {
// ok, prefetched
return make_ready_future<tmp_buf>(std::move(out));
} else {
return read_exactly_part_direct(n, std::move(out), len_needs_copy);
}
}

template <typename CharType>
template <typename Consumer>
GCC6_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
Expand Down
19 changes: 19 additions & 0 deletions include/seastar/core/iostream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ class data_source_impl {
public:
virtual ~data_source_impl() {}
virtual future<temporary_buffer<char>> get() = 0;
virtual future<size_t, temporary_buffer<char>> get_direct(char* buf, size_t size) {
// the default implementation:
// need to override if the concrete data_source_impl
// can work with user provided buffer pointer with less copy.
return get().then([buf, size] (auto read_buf) mutable {
auto len_needs_copy = std::min(read_buf.size(), size);
std::copy(read_buf.get_write(), read_buf.get_write() + len_needs_copy, buf);
read_buf.trim_front(len_needs_copy);
return make_ready_future<size_t, temporary_buffer<char>>(
len_needs_copy, std::move(read_buf));
});
}
virtual future<temporary_buffer<char>> skip(uint64_t n);
virtual future<> close() { return make_ready_future<>(); }
};
Expand All @@ -62,6 +74,10 @@ public:
data_source(data_source&& x) = default;
data_source& operator=(data_source&& x) = default;
future<temporary_buffer<char>> get() { return _dsi->get(); }
// fill the buf directly within the size boundary.
// return the number of bytes actually read, and if the direct buf is fulfilled,
// also return the prefetched buffer.
future<size_t, temporary_buffer<char>> get_direct(char* buf, size_t size) { return _dsi->get_direct(buf, size); }
future<temporary_buffer<char>> skip(uint64_t n) { return _dsi->skip(n); }
future<> close() { return _dsi->close(); }
};
Expand Down Expand Up @@ -214,6 +230,8 @@ public:
input_stream(input_stream&&) = default;
input_stream& operator=(input_stream&&) = default;
future<temporary_buffer<CharType>> read_exactly(size_t n);
static constexpr uint16_t DEFAULT_ALIGNMENT = alignof(void*);
future<temporary_buffer<CharType>> read_exactly2(size_t n, uint16_t alignment = DEFAULT_ALIGNMENT);
template <typename Consumer>
GCC6_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
future<> consume(Consumer&& c);
Expand Down Expand Up @@ -256,6 +274,7 @@ public:
data_source detach() &&;
private:
future<temporary_buffer<CharType>> read_exactly_part(size_t n, tmp_buf buf, size_t completed);
future<temporary_buffer<CharType>> read_exactly_part_direct(size_t n, tmp_buf buf, size_t completed);
};

// Facilitates data buffering before it's handed over to data_sink.
Expand Down
1 change: 1 addition & 0 deletions include/seastar/net/posix-stack.hh
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public:
size_t buf_size = 8192) : _buffer_allocator(allocator), _fd(std::move(fd)),
_buf(make_temporary_buffer<char>(_buffer_allocator, buf_size)), _buf_size(buf_size) {}
future<temporary_buffer<char>> get() override;
future<size_t, temporary_buffer<char>> get_direct(char* buf, size_t size) override;
future<> close() override;
};

Expand Down
15 changes: 15 additions & 0 deletions src/net/posix-stack.cc
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,21 @@ posix_data_source_impl::get() {
});
}

future<size_t, temporary_buffer<char>>
posix_data_source_impl::get_direct(char* buf, size_t size) {
if (size > _buf_size / 2) {
// this was a large read, we don't prefetch
return _fd->read_some(buf, size).then([] (auto read_size) {
return make_ready_future<size_t, temporary_buffer<char>>(
read_size, temporary_buffer<char>());
});
} else {
// read with prefetch, but with extra memory copy,
// because we prefer less system calls.
return data_source_impl::get_direct(buf, size);
}
}

future<> posix_data_source_impl::close() {
_fd->shutdown(SHUT_RD);
return make_ready_future<>();
Expand Down

0 comments on commit 559d64a

Please sign in to comment.