Skip to content

Commit

Permalink
in_tail: prevent wrongly unwatching with follow_inodes
Browse files Browse the repository at this point in the history
We must not unwatch targets that still exist.
If unwatching an existing target, it causes log duplication.

In the existing implementation, `update_watcher` needs to unwatch
the old TailWatcher since `refresh_watcher` can't unwatch it when
`update_watcher` is called first.
It is because `update_watcher` discards the old TailWatcher and
`refresh_watcher` can't recognize the old inode is disappeared.

However, it can wrongly unwatch an existing inode because the
old inode may still exist.
(See the diff of test cases.)

Thus, we need a new mechanism to correctly unwatch targets.

This fix is based on the idea that we should unwatch based on
directly on PositionFile's data.

Signed-off-by: Daijiro Fukuda <[email protected]>
Co-authored-by: Takuro Ashie <[email protected]>
  • Loading branch information
daipom and ashie committed Jul 12, 2023
1 parent e120693 commit d9bfe4a
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 57 deletions.
21 changes: 6 additions & 15 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -370,17 +370,19 @@ def existence_path
def refresh_watchers
target_paths_hash = expand_paths
existence_paths_hash = existence_path

log.debug {
target_paths_str = target_paths_hash.collect { |key, target_info| target_info.path }.join(",")
existence_paths_str = existence_paths_hash.collect { |key, target_info| target_info.path }.join(",")
"tailing paths: target = #{target_paths_str} | existing = #{existence_paths_str}"
}

unwatched_hash = existence_paths_hash.reject {|key, value| target_paths_hash.key?(key)}
@pf&.unwatch_removed_targets(target_paths_hash)

removed_hash = existence_paths_hash.reject {|key, value| target_paths_hash.key?(key)}
added_hash = target_paths_hash.reject {|key, value| existence_paths_hash.key?(key)}

stop_watchers(unwatched_hash, immediate: false, unwatched: true) unless unwatched_hash.empty?
stop_watchers(removed_hash, immediate: false) unless removed_hash.empty?
start_watchers(added_hash) unless added_hash.empty?
@startup = false if @startup
end
Expand Down Expand Up @@ -454,7 +456,7 @@ def start_watchers(targets_info)
}
end

def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watcher: true)
def stop_watchers(targets_info, immediate: false, remove_watcher: true)
targets_info.each_value { |target_info|
remove_path_from_group_watcher(target_info.path)

Expand All @@ -464,7 +466,6 @@ def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watch
tw = @tails[target_info.path]
end
if tw
tw.unwatched = unwatched
if immediate
detach_watcher(tw, target_info.ino, false)
else
Expand Down Expand Up @@ -502,10 +503,6 @@ def update_watcher(tail_watcher, pe, new_inode)
new_target_info = TargetInfo.new(path, new_inode)

if @follow_inodes
# When follow_inodes is true, it's not cleaned up by refresh_watcher.
# So it should be unwatched here explicitly.
tail_watcher.unwatched = true

new_position_entry = @pf[new_target_info]
# If `refresh_watcher` find the new file before, this will not be zero.
# In this case, only we have to do is detaching the current tail_watcher.
Expand Down Expand Up @@ -536,11 +533,6 @@ def detach_watcher(tw, ino, close_io = true)
tw.detach(@shutdown_start_time)

tw.close if close_io

if tw.unwatched && @pf
target_info = TargetInfo.new(tw.path, ino)
@pf.unwatch(target_info)
end
end

def throttling_is_enabled?(tw)
Expand Down Expand Up @@ -780,7 +772,6 @@ def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watch
attr_reader :path, :ino
attr_reader :pe
attr_reader :line_buffer_timer_flusher
attr_accessor :unwatched # This is used for removing position entry from PositionFile
attr_reader :watchers
attr_accessor :group_watcher

Expand Down
16 changes: 12 additions & 4 deletions lib/fluent/plugin/in_tail/position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ def [](target_info)
}
end

def unwatch(target_info)
if (entry = @map.delete(@follow_inodes ? target_info.ino : target_info.path))
entry.update_pos(UNWATCHED_POSITION)
end
def unwatch_removed_targets(existing_targets)
@map.reject { |key, entry|
existing_targets.key?(key)
}.each_key { |key|
unwatch(key)
}
end

def load(existing_targets = nil)
Expand Down Expand Up @@ -118,6 +120,12 @@ def try_compact

private

def unwatch(key)
if (entry = @map.delete(key))
entry.update_pos(UNWATCHED_POSITION)
end
end

def compact(existing_targets = nil)
@file_mutex.synchronize do
entries = fetch_compacted_entries
Expand Down
69 changes: 53 additions & 16 deletions test/plugin/in_tail/test_position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ class IntailPositionFileTest < Test::Unit::TestCase
"valid_path" => Fluent::Plugin::TailInput::TargetInfo.new("valid_path", 1),
"inode23bit" => Fluent::Plugin::TailInput::TargetInfo.new("inode23bit", 0),
}
TEST_CONTENT_INODES = {
1 => Fluent::Plugin::TailInput::TargetInfo.new("valid_path", 1),
0 => Fluent::Plugin::TailInput::TargetInfo.new("inode23bit", 0),
}

def write_data(f, content)
f.write(content)
Expand Down Expand Up @@ -99,7 +103,7 @@ def follow_inodes_block
pf[target_info3]

target_info1_2 = Fluent::Plugin::TailInput::TargetInfo.new('path1', 1234)
pf.unwatch(target_info1_2)
pf.send(:unwatch, target_info1_2.path)

pf.try_compact

Expand All @@ -111,8 +115,8 @@ def follow_inodes_block

target_info2_2 = Fluent::Plugin::TailInput::TargetInfo.new('path2', 1235)
target_info3_2 = Fluent::Plugin::TailInput::TargetInfo.new('path3', 1236)
pf.unwatch(target_info2_2)
pf.unwatch(target_info3_2)
pf.send(:unwatch, target_info2_2.path)
pf.send(:unwatch, target_info3_2.path)
@file.seek(0)
lines = @file.readlines
assert_equal "path2\t#{UNWATCHED_STR}\t0000000000000000\n", lines[0]
Expand Down Expand Up @@ -221,23 +225,56 @@ def follow_inodes_block
end

sub_test_case '#unwatch' do
test 'deletes entry by path' do
test 'unwatch entries by path' do
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, false, {}, logger: $log)
inode1 = File.stat(@file).ino
target_info1 = Fluent::Plugin::TailInput::TargetInfo.new('valid_path', inode1)
p1 = pf[target_info1]
assert_equal Fluent::Plugin::TailInput::FilePositionEntry, p1.class
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, false, TEST_CONTENT_PATHS, logger: $log)

pf.unwatch(target_info1)
assert_equal p1.read_pos, Fluent::Plugin::TailInput::PositionFile::UNWATCHED_POSITION
existing_targets = TEST_CONTENT_PATHS.select do |path, target_info|
path == "valid_path"
end
pe_to_unwatch = pf[TEST_CONTENT_PATHS["inode23bit"]]

pf.unwatch_removed_targets(existing_targets)

assert_equal(
{
map_keys: ["valid_path"],
unwatched_pe_pos: Fluent::Plugin::TailInput::PositionFile::UNWATCHED_POSITION,
},
{
map_keys: pf.instance_variable_get(:@map).keys,
unwatched_pe_pos: pe_to_unwatch.read_pos,
}
)

unwatched_pe_retaken = pf[TEST_CONTENT_PATHS["inode23bit"]]
assert_not_equal pe_to_unwatch, unwatched_pe_retaken
end

inode2 = File.stat(@file).ino
target_info2 = Fluent::Plugin::TailInput::TargetInfo.new('valid_path', inode2)
p2 = pf[target_info2]
assert_equal Fluent::Plugin::TailInput::FilePositionEntry, p2.class
test 'unwatch entries by inode' do
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, true, TEST_CONTENT_INODES, logger: $log)

assert_not_equal p1, p2
existing_targets = TEST_CONTENT_INODES.select do |inode, target_info|
inode == 1
end
pe_to_unwatch = pf[TEST_CONTENT_INODES[0]]

pf.unwatch_removed_targets(existing_targets)

assert_equal(
{
map_keys: [TEST_CONTENT_INODES[1].ino],
unwatched_pe_pos: Fluent::Plugin::TailInput::PositionFile::UNWATCHED_POSITION,
},
{
map_keys: pf.instance_variable_get(:@map).keys,
unwatched_pe_pos: pe_to_unwatch.read_pos,
}
)

unwatched_pe_retaken = pf[TEST_CONTENT_INODES[0]]
assert_not_equal pe_to_unwatch, unwatched_pe_retaken
end
end

Expand Down
31 changes: 9 additions & 22 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2704,22 +2704,14 @@ def test_updateTW_before_refreshTW_and_detach_before_refreshTW

assert_equal(
{
# TODO: This is BUG!! We need to fix it and replace this with the next.
record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"],
# record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"],
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
# TODO: This is BUG!! We need to fix it and replace this with the next.
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0],
],
# position_entries: [
# ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
# ],
},
{
record_values: record_values,
Expand Down Expand Up @@ -2802,7 +2794,8 @@ def test_updateTW_before_refreshTW_and_detach_after_refreshTW
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# The recorded path is old, but it is no problem. The path is not used when using follow_inodes.
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
],
},
Expand Down Expand Up @@ -2861,8 +2854,9 @@ def test_updateTW_after_refreshTW
# This overwrites `@tails["tail.txt"]`
d.instance.refresh_watchers

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher:
# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` trys to update the TailWatcher:
# TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1)
# However, it is already added in `refresh_watcher`, so `update_watcher` doesn't create the new TailWatcher.
# The old TailWathcer is detached here since `rotate_wait` is just `1s`.
sleep 3

Expand All @@ -2886,22 +2880,15 @@ def test_updateTW_after_refreshTW

assert_equal(
{
# TODO: This is BUG!! We need to fix it and replace this with the next.
record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"],
# record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"],
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
# TODO: This is BUG!! We need to fix it and replace this with the next.
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# The recorded path is old, but it is no problem. The path is not used when using follow_inodes.
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0],
],
# position_entries: [
# ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
# ],
},
{
record_values: record_values,
Expand Down

0 comments on commit d9bfe4a

Please sign in to comment.