Skip to content

Commit

Permalink
Update pos even if line is skipped
Browse files Browse the repository at this point in the history
  • Loading branch information
been-zino committed Jun 9, 2024
1 parent f96d710 commit b7f5859
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 26 deletions.
16 changes: 15 additions & 1 deletion lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ def update_watcher(tail_watcher, pe, new_inode)
if @follow_inodes && new_inode.nil?
# nil inode means the file disappeared, so we only need to stop it.
@tails.delete(tail_watcher.path)
# https://github.com/fluent/fluentd/pull/4237#issuecomment-1633358632
# https://github.com/fluent/fluentd/pull/4237#issuecomment-1633358632
# Because of this problem, log duplication can occur during `rotate_wait`.
# Need to set `rotate_wait 0` for a workaround.
# Duplication will occur if `refresh_watcher` is called during the `rotate_wait`.
Expand Down Expand Up @@ -1012,6 +1012,7 @@ def initialize(from_encoding, encoding, log, max_line_size=nil)
@eol = "\n".encode(from_encoding).freeze
@max_line_size = max_line_size
@was_long_line = false
@has_skipped_line = false
@log = log
end

Expand Down Expand Up @@ -1047,6 +1048,7 @@ def convert(s)

def read_lines(lines)
idx = @buffer.index(@eol)
@has_skipped_line = false

until idx.nil?
# Using freeze and slice is faster than slice!
Expand All @@ -1064,6 +1066,7 @@ def read_lines(lines)
@log.warn "received line length is longer than #{@max_line_size}"
@log.debug("skipped line: ") { convert(rbuf).chomp }
@was_long_line = false
@has_skipped_line = true
next
end

Expand All @@ -1077,12 +1080,17 @@ def read_lines(lines)
if is_long_line
@buffer.clear
@was_long_line = true
@has_skipped_line = true
end
end

def bytesize
@buffer.bytesize
end

def has_skipped_line?
@has_skipped_line
end
end

class IOHandler
Expand Down Expand Up @@ -1181,6 +1189,7 @@ def handle_notify
with_io do |io|
begin
read_more = false
has_skipped_line = false

if !io.nil? && @lines.empty?
begin
Expand All @@ -1195,6 +1204,7 @@ def handle_notify

n_lines_before_read = @lines.size
@fifo.read_lines(@lines)
has_skipped_line = @fifo.has_skipped_line?
group_watcher&.update_lines_read(@path, @lines.size - n_lines_before_read)

group_watcher_limit = group_watcher&.limit_lines_reached?(@path)
Expand All @@ -1217,6 +1227,10 @@ def handle_notify
end
end

if @lines.empty? && has_skipped_line
@watcher.pe.update_pos(io.pos - @fifo.bytesize)
end

unless @lines.empty?
if @receive_lines.call(@lines, @watcher)
@watcher.pe.update_pos(io.pos - @fifo.bytesize)
Expand Down
48 changes: 23 additions & 25 deletions test/plugin/in_tail/test_io_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -146,36 +146,34 @@ def create_watcher
assert_equal 5, returned_lines[0].size
assert_equal 3, returned_lines[1].size
end
end

test 'call receive_lines some times when long line(more than 8192) and max_line_size is 4096' do
t = 'line' * (8192 / 8)
text = "#{t}\n" * 8
t = 'line' * 8
text += "#{t}\n" * 8
test 'does not call receive_lines when line_size exceeds max_line_size' do
t = 'x' * (8192)
text = "#{t}\n"

@file.write(text)
@file.close
max_line_size = 8192

update_pos = 0
@file.write(text)
@file.close

watcher = create_watcher
stub(watcher).pe do
pe = 'position_file'
stub(pe).read_pos {0}
stub(pe).update_pos { |val| update_pos = val }
pe
end
update_pos = 0

returned_lines = []
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, max_line_size: 4096, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher|
returned_lines << lines.dup
true
end
watcher = create_watcher
stub(watcher).pe do
pe = 'position_file'
stub(pe).read_pos {0}
stub(pe).update_pos { |val| update_pos = val }
pe
end

r.on_notify
assert_equal text.bytesize, update_pos
assert_equal 1, returned_lines.size
assert_equal 8, returned_lines[0].size
returned_lines = []
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 1000, read_bytes_limit_per_second: -1, max_line_size: max_line_size, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher|
returned_lines << lines.dup
true
end

r.on_notify
assert_equal text.bytesize, update_pos
assert_equal 0, returned_lines.size
end
end

0 comments on commit b7f5859

Please sign in to comment.