Skip to content

Commit

Permalink
Merge pull request #4237 from daipom/in_tail-prevent-wrongly-unwatchi…
Browse files Browse the repository at this point in the history
…ng-2

in_tail: prevent wrongly unwatching with follow_inodes
  • Loading branch information
ashie authored Jul 13, 2023
2 parents e120693 + 51848da commit d05f592
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 33 deletions.
23 changes: 16 additions & 7 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -370,17 +370,30 @@ def existence_path
def refresh_watchers
target_paths_hash = expand_paths
existence_paths_hash = existence_path

log.debug {
target_paths_str = target_paths_hash.collect { |key, target_info| target_info.path }.join(",")
existence_paths_str = existence_paths_hash.collect { |key, target_info| target_info.path }.join(",")
"tailing paths: target = #{target_paths_str} | existing = #{existence_paths_str}"
}

unwatched_hash = existence_paths_hash.reject {|key, value| target_paths_hash.key?(key)}
if !@follow_inodes
need_unwatch_in_stop_watchers = true
else
# When using @follow_inodes, need this to unwatch the rotated old inode when it disappears.
# After `update_watcher` detaches an old TailWatcher, the inode is lost from the `@tails`.
# So that inode can't be contained in `removed_hash`, and can't be unwatched by `stop_watchers`.
#
# This logic may work for `@follow_inodes false` too.
# Just limiting the case to supress the impact to existing logics.
@pf&.unwatch_removed_targets(target_paths_hash)
need_unwatch_in_stop_watchers = false
end

removed_hash = existence_paths_hash.reject {|key, value| target_paths_hash.key?(key)}
added_hash = target_paths_hash.reject {|key, value| existence_paths_hash.key?(key)}

stop_watchers(unwatched_hash, immediate: false, unwatched: true) unless unwatched_hash.empty?
stop_watchers(removed_hash, unwatched: need_unwatch_in_stop_watchers) unless removed_hash.empty?
start_watchers(added_hash) unless added_hash.empty?
@startup = false if @startup
end
Expand Down Expand Up @@ -502,10 +515,6 @@ def update_watcher(tail_watcher, pe, new_inode)
new_target_info = TargetInfo.new(path, new_inode)

if @follow_inodes
# When follow_inodes is true, it's not cleaned up by refresh_watcher.
# So it should be unwatched here explicitly.
tail_watcher.unwatched = true

new_position_entry = @pf[new_target_info]
# If `refresh_watcher` find the new file before, this will not be zero.
# In this case, only we have to do is detaching the current tail_watcher.
Expand Down
18 changes: 15 additions & 3 deletions lib/fluent/plugin/in_tail/position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,16 @@ def [](target_info)
}
end

def unwatch_removed_targets(existing_targets)
@map.reject { |key, entry|
existing_targets.key?(key)
}.each_key { |key|
unwatch_key(key)
}
end

def unwatch(target_info)
if (entry = @map.delete(@follow_inodes ? target_info.ino : target_info.path))
entry.update_pos(UNWATCHED_POSITION)
end
unwatch_key(@follow_inodes ? target_info.ino : target_info.path)
end

def load(existing_targets = nil)
Expand Down Expand Up @@ -118,6 +124,12 @@ def try_compact

private

def unwatch_key(key)
if (entry = @map.delete(key))
entry.update_pos(UNWATCHED_POSITION)
end
end

def compact(existing_targets = nil)
@file_mutex.synchronize do
entries = fetch_compacted_entries
Expand Down
32 changes: 31 additions & 1 deletion test/plugin/in_tail/test_position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ class IntailPositionFileTest < Test::Unit::TestCase
"valid_path" => Fluent::Plugin::TailInput::TargetInfo.new("valid_path", 1),
"inode23bit" => Fluent::Plugin::TailInput::TargetInfo.new("inode23bit", 0),
}
TEST_CONTENT_INODES = {
1 => Fluent::Plugin::TailInput::TargetInfo.new("valid_path", 1),
0 => Fluent::Plugin::TailInput::TargetInfo.new("inode23bit", 0),
}

def write_data(f, content)
f.write(content)
Expand Down Expand Up @@ -221,7 +225,7 @@ def follow_inodes_block
end

sub_test_case '#unwatch' do
test 'deletes entry by path' do
test 'unwatch entry by path' do
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, false, {}, logger: $log)
inode1 = File.stat(@file).ino
Expand All @@ -239,6 +243,32 @@ def follow_inodes_block

assert_not_equal p1, p2
end

test 'unwatch entries by inode' do
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, true, TEST_CONTENT_INODES, logger: $log)

existing_targets = TEST_CONTENT_INODES.select do |inode, target_info|
inode == 1
end
pe_to_unwatch = pf[TEST_CONTENT_INODES[0]]

pf.unwatch_removed_targets(existing_targets)

assert_equal(
{
map_keys: [TEST_CONTENT_INODES[1].ino],
unwatched_pe_pos: Fluent::Plugin::TailInput::PositionFile::UNWATCHED_POSITION,
},
{
map_keys: pf.instance_variable_get(:@map).keys,
unwatched_pe_pos: pe_to_unwatch.read_pos,
}
)

unwatched_pe_retaken = pf[TEST_CONTENT_INODES[0]]
assert_not_equal pe_to_unwatch, unwatched_pe_retaken
end
end

sub_test_case 'FilePositionEntry' do
Expand Down
31 changes: 9 additions & 22 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2704,22 +2704,14 @@ def test_updateTW_before_refreshTW_and_detach_before_refreshTW

assert_equal(
{
# TODO: This is BUG!! We need to fix it and replace this with the next.
record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"],
# record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"],
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
# TODO: This is BUG!! We need to fix it and replace this with the next.
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0],
],
# position_entries: [
# ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
# ],
},
{
record_values: record_values,
Expand Down Expand Up @@ -2802,7 +2794,8 @@ def test_updateTW_before_refreshTW_and_detach_after_refreshTW
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# The recorded path is old, but it is no problem. The path is not used when using follow_inodes.
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
],
},
Expand Down Expand Up @@ -2861,8 +2854,9 @@ def test_updateTW_after_refreshTW
# This overwrites `@tails["tail.txt"]`
d.instance.refresh_watchers

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher:
# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` trys to update the TailWatcher:
# TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1)
# However, it is already added in `refresh_watcher`, so `update_watcher` doesn't create the new TailWatcher.
# The old TailWathcer is detached here since `rotate_wait` is just `1s`.
sleep 3

Expand All @@ -2886,22 +2880,15 @@ def test_updateTW_after_refreshTW

assert_equal(
{
# TODO: This is BUG!! We need to fix it and replace this with the next.
record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"],
# record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"],
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
# TODO: This is BUG!! We need to fix it and replace this with the next.
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# The recorded path is old, but it is no problem. The path is not used when using follow_inodes.
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0],
],
# position_entries: [
# ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
# ],
},
{
record_values: record_values,
Expand Down

0 comments on commit d05f592

Please sign in to comment.