Skip to content

Commit

Permalink
Fix ChunkMessagePackEventStreamer's invalid argument
Browse files Browse the repository at this point in the history
The argument `unpacker` of `ChunkMessagePackEventStreamer.each`
seems to have been added in order to match the feature with
`EventStream` at c6c6c03.

However, that previous implementation at that point does not
work as expected.
It has never causes any issues just because the argument was not
used at all.

It could be implemented so that this argument is used, but given
that it has not been used so far, there is little need for it.

Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom committed Jul 10, 2023
1 parent aaa40f7 commit 5121054
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
8 changes: 6 additions & 2 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,15 @@ def ensure_decompressed!
end

module ChunkMessagePackEventStreamer
# chunk.extend(ChunkEventStreamer)
# chunk.extend(ChunkMessagePackEventStreamer)
# => chunk.each{|time, record| ... }
def each(unpacker: nil, &block)
# Note: If need to use `unpacker`, then implement it,
# e.g., `unpacker.feed_each(io.read, &block)` (Not tested)
raise NotImplementedError, "'unpacker' argument is not implemented." if unpacker

open do |io|
(unpacker || Fluent::MessagePackFactory.msgpack_unpacker(io)).each(&block)
Fluent::MessagePackFactory.msgpack_unpacker(io).each(&block)
end
nil
end
Expand Down
11 changes: 11 additions & 0 deletions test/plugin/test_buffer_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ class BufferChunkTest < Test::Unit::TestCase
assert chunk.respond_to?(:msgpack_each)
end

test 'unpacker arg is not implemented for ChunkMessagePackEventStreamer' do
meta = Object.new
chunk = Fluent::Plugin::Buffer::Chunk.new(meta)
chunk.extend Fluent::ChunkMessagePackEventStreamer

unpacker = Fluent::MessagePackFactory.thread_local_msgpack_unpacker

assert_raise(NotImplementedError){ chunk.each(unpacker: unpacker) }
assert_raise(NotImplementedError){ chunk.msgpack_each(unpacker: unpacker) }
end

test 'some methods raise ArgumentError with an option of `compressed: :gzip` and without extending Compressble`' do
meta = Object.new
chunk = Fluent::Plugin::Buffer::Chunk.new(meta)
Expand Down

0 comments on commit 5121054

Please sign in to comment.