Skip to content

Commit

Permalink
Various Down::ChunkedIO improvements
Browse files Browse the repository at this point in the history
Instead of trying to optimize every path, we make sure to optimize only
the most important one, which is the one with an output buffer. This
makes the code a bit easier to reason about.

We also make sure that we're dealing with binary encoding all the way
until we need to return the result, and only then force to specified
encoding if it's different than binary.

These changes allow us to easily handle frozen chunks, which fixes #32.

While we're at it, we also greatly improve ChunkedIO#gets memory
allocation.
  • Loading branch information
janko committed Sep 26, 2019
1 parent dac1c35 commit 898004d
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 92 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
## HEAD

* Change `ChunkedIO#each_chunk` to return chunks in original encoding (@janko)

* Always return binary strings in `ChunkedIO#readpartial` (@janko)

* Handle frozen chunks in `Down::ChunkedIO` (@janko)

* Change `ChunkedIO#gets` to return lines in specified encoding (@janko)

* Halve memory allocation for `ChunkedIO#gets` (@janko)

* Halve memory allocation for `ChunkedIO#read` without arguments (@janko)

## 4.8.1 (2019-05-01)

* Make `ChunkedIO#read`/`#readpartial` with length always return strings in binary encoding (@janko)
Expand Down
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ source "https://rubygems.org"
gemspec

gem "pry"
gem "memory_profiler"
78 changes: 37 additions & 41 deletions lib/down/chunked_io.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,18 @@ def each_chunk
def read(length = nil, outbuf = nil)
fail IOError, "closed stream" if closed?

data = outbuf.to_s.clear.force_encoding(Encoding::BINARY)
remaining_length = length

begin
data = readpartial(remaining_length, outbuf)
data = data.dup unless outbuf
remaining_length = length - data.bytesize if length
rescue EOFError
end

until remaining_length == 0 || eof?
data << readpartial(remaining_length)
data << readpartial(remaining_length, buffer ||= String.new)
remaining_length = length - data.bytesize if length
end

data.to_s unless length && length > 0 && (data.nil? || data.empty?)
buffer.clear if buffer # deallocate string

data.force_encoding(@encoding) unless length
data unless data.empty? && length && length > 0
end

# Implements IO#gets semantics. Without arguments it retrieves lines of
Expand Down Expand Up @@ -108,27 +105,33 @@ def gets(separator_or_limit = $/, limit = nil)

separator = "\n\n" if separator.empty?

begin
data = readpartial(limit)
data = String.new

until data.include?(separator) || data.bytesize == limit || eof?
remaining_length = limit - data.bytesize if limit
data << readpartial(remaining_length, outbuf ||= String.new)
end
until data.include?(separator) || data.bytesize == limit || eof?
remaining_length = limit - data.bytesize if limit
data << readpartial(remaining_length, buffer ||= String.new)
end

line, extra = data.split(separator, 2)
line << separator if data.include?(separator)
buffer.clear if buffer # deallocate buffer

line, extra = data.split(separator, 2)
line << separator if data.include?(separator)

data.clear # deallocate data

if extra
if cache
cache.pos -= extra.to_s.bytesize
cache.pos -= extra.bytesize
else
@buffer = @buffer.to_s.prepend(extra.to_s)
if @buffer
@buffer.prepend(extra)
else
@buffer = extra
end
end
rescue EOFError
line = nil
end

line
line.force_encoding(@encoding) if line
end

# Implements IO#readpartial semantics. If there is any content readily
Expand All @@ -145,49 +148,42 @@ def gets(separator_or_limit = $/, limit = nil)
# where the value is replaced with retrieved content.
#
# Raises EOFError if end of file is reached. Raises IOError if closed.
def readpartial(length = nil, outbuf = nil)
def readpartial(maxlen = nil, outbuf = nil)
fail IOError, "closed stream" if closed?

data = outbuf.clear.force_encoding(@encoding) if outbuf
maxlen ||= 16*1024

return data.to_s if length == 0
data = cache.read(maxlen, outbuf) if cache && !cache.eof?
data ||= outbuf.to_s.clear

if cache && !cache.eof?
data = cache.read(length, outbuf)
data.force_encoding(@encoding)
end
return data if maxlen == 0

if @buffer.nil? && (data.nil? || data.empty?)
if @buffer.nil? && data.empty?
fail EOFError, "end of file reached" if chunks_depleted?
@buffer = retrieve_chunk
end

remaining_length = data && length ? length - data.bytesize : length
remaining_length = maxlen - data.bytesize

unless @buffer.nil? || remaining_length == 0
if remaining_length && remaining_length < @buffer.bytesize
if remaining_length < @buffer.bytesize
buffered_data = @buffer.byteslice(0, remaining_length)
@buffer = @buffer.byteslice(remaining_length..-1)
else
buffered_data = @buffer
@buffer = nil
end

if data
data << buffered_data
else
data = buffered_data
end
data << buffered_data

cache.write(buffered_data) if cache

buffered_data.clear unless buffered_data.equal?(data)
buffered_data.clear unless buffered_data.frozen?
end

@position += data.bytesize

data.force_encoding(Encoding::BINARY) if length
data
data.force_encoding(Encoding::BINARY)
end

# Implements IO#seek semantics.
Expand Down Expand Up @@ -303,7 +299,7 @@ def cache
def retrieve_chunk
chunk = @next_chunk
@next_chunk = chunks_fiber.resume
chunk.force_encoding(@encoding) if chunk
chunk
end

# Returns whether there is any content left to retrieve.
Expand Down
89 changes: 38 additions & 51 deletions test/chunked_io_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,14 @@ def chunked_io(options = {})
assert_equal "d\n", io.gets
end

it "retrieves lines when non-rewindable with limit" do
io = chunked_io(chunks: ["a\n", "b\nc\nd", "\n"].each, rewindable: false)
assert_equal "a\n", io.gets(3)
assert_equal "b\n", io.gets(3)
assert_equal "c\n", io.gets(3)
assert_equal "d\n", io.gets(3)
end

it "accepts a different separator" do
io = chunked_io(chunks: ["a\r", "b\rc\rd", "\r"].each)
assert_equal "a\r", io.gets("\r")
Expand Down Expand Up @@ -326,6 +334,14 @@ def chunked_io(options = {})
assert_equal "\r", io.gets(nil, 3)
end

it "returns lines in sepecified encoding" do
io = chunked_io(chunks: ["ab", "c"], encoding: Encoding::UTF_8)
assert_equal Encoding::UTF_8, io.gets.encoding

io = chunked_io(chunks: ["ab", "c"])
assert_equal Encoding::BINARY, io.gets.encoding
end

it "returns nil when on EOF" do
io = chunked_io(chunks: ["a\n"].each)
io.gets
Expand All @@ -340,48 +356,7 @@ def chunked_io(options = {})
end

describe "#readpartial" do
describe "without arguments" do
it "reads the next chunk" do
io = chunked_io(chunks: ["ab", "c"].each)
assert_equal "ab", io.readpartial
assert_equal "c", io.readpartial
end

it "reads the remainder of a chunk" do
io = chunked_io(chunks: ["ab", "c"].each)
io.readpartial(1)
assert_equal "b", io.readpartial
assert_equal "c", io.readpartial
end

it "reads available data from cache" do
io = chunked_io(chunks: ["ab", "c"].each)
io.readpartial
io.rewind
assert_equal "ab", io.readpartial
assert_equal "c", io.readpartial
end

it "reads available data from cache and buffer" do
io = chunked_io(chunks: ["ab", "c"].each)
io.readpartial(1)
io.rewind
assert_equal "ab", io.readpartial
end

it "raises EOFError on eof" do
io = chunked_io(chunks: ["ab", "c"].each)
io.read
assert_raises(EOFError) { io.readpartial }
end

it "raises EOFError on zero chunks" do
io = chunked_io(chunks: [].each)
assert_raises(EOFError) { io.readpartial }
end
end

describe "with length" do
describe "with maxlen" do
it "reads specified number of bytes" do
io = chunked_io(chunks: ["ab", "c"].each)
assert_equal "a", io.readpartial(1)
Expand Down Expand Up @@ -419,6 +394,16 @@ def chunked_io(options = {})
assert_equal "ab", io.readpartial(4)
end

it "returns data in binary encoding" do
io = chunked_io(chunks: ["ab", "c"])
assert_equal Encoding::BINARY, io.readpartial(1).encoding
end

it "works with frozen chunks" do
io = chunked_io(chunks: ["ab".freeze, "c".freeze])
assert_equal Encoding::BINARY, io.readpartial(2).encoding
end

it "raises EOFError on eof" do
io = chunked_io(chunks: ["ab", "c"].each)
io.read
Expand Down Expand Up @@ -486,6 +471,16 @@ def chunked_io(options = {})
assert_equal "ab", io.readpartial(4, "")
end

it "returns data in binary encoding" do
io = chunked_io(chunks: ["ab", "c"])
assert_equal Encoding::BINARY, io.readpartial(1, "").encoding
end

it "works with frozen chunks" do
io = chunked_io(chunks: ["ab".freeze, "c".freeze])
assert_equal Encoding::BINARY, io.readpartial(2, "").encoding
end

it "raises EOFError on eof" do
io = chunked_io(chunks: ["ab", "c"].each)
io.read
Expand All @@ -505,7 +500,7 @@ def chunked_io(options = {})
it "raises IOError when closed" do
io = chunked_io(chunks: ["ab", "c"].each)
io.close
assert_raises(IOError) { io.readpartial }
assert_raises(IOError) { io.readpartial(4) }
end
end

Expand Down Expand Up @@ -602,14 +597,6 @@ def chunked_io(options = {})
assert_equal "", io.read
end

it "returns chunks in correct encoding" do
io = chunked_io(chunks: ["ab", "c"].each)
io.each_chunk { |chunk| assert_equal Encoding::BINARY, chunk.encoding }

io = chunked_io(chunks: ["ab", "c"].each, encoding: "utf-8")
io.each_chunk { |chunk| assert_equal Encoding::UTF_8, chunk.encoding }
end

it "raises IOError when closed" do
io = chunked_io(chunks: ["ab", "c"].each)
io.close
Expand Down

0 comments on commit 898004d

Please sign in to comment.