Skip to content

Commit

Permalink
in_tail: Ensure to discard TailWatcher with missing target when follo…
Browse files Browse the repository at this point in the history
…w_inodes

For example, when a rotation process is slow, there is a small time lag
between moving and adding files.
There is a possibility that StatWatcher notifies too quickly to the
TailWatcher before the new file is moved to that target path.
From the TailWatcher, it appears as if the file is resurrected once
it disappeared.

In this case, `refresh_watcher` can't recognize it, so TailWatcher needs
to discard self correctly.
In the previous implementation, it was not done.
So it caused the handle leak and log duplication.
(Please check the added test-case)

Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom committed Jul 13, 2023
1 parent e120693 commit a91bbba
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 4 deletions.
23 changes: 19 additions & 4 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,17 @@ def close_watcher_handles

# refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.
def update_watcher(tail_watcher, pe, new_inode)
# TODO we should use another callback for this.
# To supress impact to existing logics, limit the case to `@follow_inodes`.
# We may not need `@follow_inodes` condition.
if @follow_inodes && new_inode.nil?
# nil inode means the file disappeared, so we only need to stop it.
@tails.delete(tail_watcher.path)
# Detach should be done immediately because the file doesn't exist.
detach_watcher(tail_watcher, pe.read_inode)
return
end

path = tail_watcher.path

log.info("detected rotation of #{path}; waiting #{@rotate_wait} seconds")
Expand Down Expand Up @@ -881,10 +892,14 @@ def on_rotate(stat)

if watcher_needs_update
if @follow_inodes
# No need to update a watcher if stat is nil (file not present), because moving to inodes will create
# new watcher, and old watcher will be closed by stop_watcher in refresh_watchers method
# don't want to swap state because we need latest read offset in pos file even after rotate_wait
@update_watcher.call(self, @pe, stat.ino) if stat
# If stat is nil (file not present), NEED to stop and discard this watcher.
# When the file is disappeared but is resurrected soon, then `#refresh_watcher`
# can't recognize this TailWatcher needs to be stopped.
# This can happens when the file is rotated.
# If a notify comes before the new file for the path is created during rotation,
# then it appears as if the file was resurrected once it disappeared.
# Don't want to swap state because we need latest read offset in pos file even after rotate_wait
@update_watcher.call(self, @pe, stat&.ino)
else
# Permit to handle if stat is nil (file not present).
# If a file is mv-ed and a new file is created during
Expand Down
95 changes: 95 additions & 0 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2912,5 +2912,100 @@ def test_updateTW_after_refreshTW
},
)
end

def test_path_resurrection
config = config_element(
"ROOT",
"",
{
"path" => "#{@tmp_dir}/tail.txt*",
"pos_file" => "#{@tmp_dir}/tail.pos",
"tag" => "t1",
"format" => "none",
"read_from_head" => "true",
"follow_inodes" => "true",
# In order to reproduce the same condition stably, ensure that `refresh_watchers` is not
# called by a timer.
"refresh_interval" => "1h",
}
)
d = create_driver(config, false)

tail_watchers = []
stub.proxy(d.instance).setup_watcher do |tw|
tail_watchers.append(tw)
tw
end

Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"}

d.run(expect_records: 5, timeout: 10) do
# Rotate
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"}
FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1")
# TailWatcher(path: "tail.txt", inode: inode_0) detects `tail.txt` disappeared.
# Call `update_watcher` to stop and discard self.
# If not discarding, then it will be a orphan and cause leak and log duplication.
#
# This reproduces the case where the notify to TailWatcher comes before the new file for the path
# is created during rotation.
# (stat_watcher notifies faster than a new file is created)
# Overall, this is a rotation operation, but from the TailWatcher, it appears as if the file
# was resurrected once it disappeared.
sleep 1
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"}

# Add new TailWatchers
# tail.txt: TailWatcher(path: "tail.txt", inode: inode_1)
# tail.txt: TailWatcher(path: "tail.txt1", inode: inode_0)
# NOTE: If not discarding the first TailWatcher on notify, this makes it a orphan because
# this overwrites the `@tails[tail.txt]` by adding TailWatcher(path: "tail.txt", inode: inode_1)
d.instance.refresh_watchers

# This does nothing.
# NOTE: If not discarding the first TailWatcher on notify, this add
# tail.txt1: TailWatcher(path: "tail.txt1", inode: inode_0)
# because the previous refresh_watcher overwrites `@tails[tail.txt]` and the inode_0 is lost.
# This would cause log duplication.
d.instance.refresh_watchers

# Append to the old file
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt1", "ab") {|f| f.puts "file1 log3"}

# Append to the new current log file.
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"}
end

inode_0 = tail_watchers[0].ino
inode_1 = tail_watchers[1].ino
record_values = d.events.collect { |event| event[2]["message"] }.sort
position_entries = []
Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f|
f.readlines(chomp: true).each do |line|
values = line.split("\t")
position_entries.append([values[0], values[1], values[2].to_i(16)])
end
end

assert_equal(
{
record_values: ["file1 log1", "file1 log2", "file1 log3", "file2 log1", "file2 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"],
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
position_entries: [
["#{@tmp_dir}/tail.txt", "0000000000000021", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
],
},
{
record_values: record_values,
tail_watcher_paths: tail_watchers.collect { |tw| tw.path },
tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino },
tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false },
position_entries: position_entries
},
)
end
end
end

0 comments on commit a91bbba

Please sign in to comment.