From 51848da431de9dd84c2528af4143a01a299d047e Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Wed, 12 Jul 2023 18:00:11 +0900 Subject: [PATCH] in_tail: prevent wrongly unwatching with follow_inodes We must not unwatch targets that still exist. If unwatching an existing target, it causes log duplication. In the existing implementation, `update_watcher` needs to unwatch the old TailWatcher since `refresh_watcher` can't unwatch it when `update_watcher` is called first. It is because `update_watcher` discards the old TailWatcher and `refresh_watcher` can't recognize the old inode is disappeared. However, it can wrongly unwatch an existing inode because the old inode may still exist. (See the diff of test cases.) Thus, we need a new mechanism to correctly unwatch targets when follow_inodes. This fix is based on the idea that we should unwatch based on directly on PositionFile's data. Signed-off-by: Daijiro Fukuda Co-authored-by: Takuro Ashie --- lib/fluent/plugin/in_tail.rb | 23 +++++++++++----- lib/fluent/plugin/in_tail/position_file.rb | 18 ++++++++++-- test/plugin/in_tail/test_position_file.rb | 32 +++++++++++++++++++++- test/plugin/test_in_tail.rb | 31 ++++++--------------- 4 files changed, 71 insertions(+), 33 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 09da75fcb1..a58e6b39d9 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -370,17 +370,30 @@ def existence_path def refresh_watchers target_paths_hash = expand_paths existence_paths_hash = existence_path - + log.debug { target_paths_str = target_paths_hash.collect { |key, target_info| target_info.path }.join(",") existence_paths_str = existence_paths_hash.collect { |key, target_info| target_info.path }.join(",") "tailing paths: target = #{target_paths_str} | existing = #{existence_paths_str}" } - unwatched_hash = existence_paths_hash.reject {|key, value| target_paths_hash.key?(key)} + if !@follow_inodes + need_unwatch_in_stop_watchers = true + else + # When using @follow_inodes, need this to unwatch the rotated old inode when it disappears. + # After `update_watcher` detaches an old TailWatcher, the inode is lost from the `@tails`. + # So that inode can't be contained in `removed_hash`, and can't be unwatched by `stop_watchers`. + # + # This logic may work for `@follow_inodes false` too. + # Just limiting the case to supress the impact to existing logics. + @pf&.unwatch_removed_targets(target_paths_hash) + need_unwatch_in_stop_watchers = false + end + + removed_hash = existence_paths_hash.reject {|key, value| target_paths_hash.key?(key)} added_hash = target_paths_hash.reject {|key, value| existence_paths_hash.key?(key)} - stop_watchers(unwatched_hash, immediate: false, unwatched: true) unless unwatched_hash.empty? + stop_watchers(removed_hash, unwatched: need_unwatch_in_stop_watchers) unless removed_hash.empty? start_watchers(added_hash) unless added_hash.empty? @startup = false if @startup end @@ -502,10 +515,6 @@ def update_watcher(tail_watcher, pe, new_inode) new_target_info = TargetInfo.new(path, new_inode) if @follow_inodes - # When follow_inodes is true, it's not cleaned up by refresh_watcher. - # So it should be unwatched here explicitly. - tail_watcher.unwatched = true - new_position_entry = @pf[new_target_info] # If `refresh_watcher` find the new file before, this will not be zero. # In this case, only we have to do is detaching the current tail_watcher. diff --git a/lib/fluent/plugin/in_tail/position_file.rb b/lib/fluent/plugin/in_tail/position_file.rb index bc87697431..fb524a9c54 100644 --- a/lib/fluent/plugin/in_tail/position_file.rb +++ b/lib/fluent/plugin/in_tail/position_file.rb @@ -53,10 +53,16 @@ def [](target_info) } end + def unwatch_removed_targets(existing_targets) + @map.reject { |key, entry| + existing_targets.key?(key) + }.each_key { |key| + unwatch_key(key) + } + end + def unwatch(target_info) - if (entry = @map.delete(@follow_inodes ? target_info.ino : target_info.path)) - entry.update_pos(UNWATCHED_POSITION) - end + unwatch_key(@follow_inodes ? target_info.ino : target_info.path) end def load(existing_targets = nil) @@ -118,6 +124,12 @@ def try_compact private + def unwatch_key(key) + if (entry = @map.delete(key)) + entry.update_pos(UNWATCHED_POSITION) + end + end + def compact(existing_targets = nil) @file_mutex.synchronize do entries = fetch_compacted_entries diff --git a/test/plugin/in_tail/test_position_file.rb b/test/plugin/in_tail/test_position_file.rb index 22ce4629f8..af692fdee5 100644 --- a/test/plugin/in_tail/test_position_file.rb +++ b/test/plugin/in_tail/test_position_file.rb @@ -26,6 +26,10 @@ class IntailPositionFileTest < Test::Unit::TestCase "valid_path" => Fluent::Plugin::TailInput::TargetInfo.new("valid_path", 1), "inode23bit" => Fluent::Plugin::TailInput::TargetInfo.new("inode23bit", 0), } + TEST_CONTENT_INODES = { + 1 => Fluent::Plugin::TailInput::TargetInfo.new("valid_path", 1), + 0 => Fluent::Plugin::TailInput::TargetInfo.new("inode23bit", 0), + } def write_data(f, content) f.write(content) @@ -221,7 +225,7 @@ def follow_inodes_block end sub_test_case '#unwatch' do - test 'deletes entry by path' do + test 'unwatch entry by path' do write_data(@file, TEST_CONTENT) pf = Fluent::Plugin::TailInput::PositionFile.load(@file, false, {}, logger: $log) inode1 = File.stat(@file).ino @@ -239,6 +243,32 @@ def follow_inodes_block assert_not_equal p1, p2 end + + test 'unwatch entries by inode' do + write_data(@file, TEST_CONTENT) + pf = Fluent::Plugin::TailInput::PositionFile.load(@file, true, TEST_CONTENT_INODES, logger: $log) + + existing_targets = TEST_CONTENT_INODES.select do |inode, target_info| + inode == 1 + end + pe_to_unwatch = pf[TEST_CONTENT_INODES[0]] + + pf.unwatch_removed_targets(existing_targets) + + assert_equal( + { + map_keys: [TEST_CONTENT_INODES[1].ino], + unwatched_pe_pos: Fluent::Plugin::TailInput::PositionFile::UNWATCHED_POSITION, + }, + { + map_keys: pf.instance_variable_get(:@map).keys, + unwatched_pe_pos: pe_to_unwatch.read_pos, + } + ) + + unwatched_pe_retaken = pf[TEST_CONTENT_INODES[0]] + assert_not_equal pe_to_unwatch, unwatched_pe_retaken + end end sub_test_case 'FilePositionEntry' do diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 81bf0dfa7d..9d2cfeca39 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -2704,22 +2704,14 @@ def test_updateTW_before_refreshTW_and_detach_before_refreshTW assert_equal( { - # TODO: This is BUG!! We need to fix it and replace this with the next. - record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"], - # record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"], + record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"], tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"], tail_watcher_inodes: [inode_0, inode_1, inode_0], tail_watcher_io_handler_opened_statuses: [false, false, false], - # TODO: This is BUG!! We need to fix it and replace this with the next. position_entries: [ - ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], + ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_0], ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], - ["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0], ], - # position_entries: [ - # ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], - # ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], - # ], }, { record_values: record_values, @@ -2802,7 +2794,8 @@ def test_updateTW_before_refreshTW_and_detach_after_refreshTW tail_watcher_inodes: [inode_0, inode_1, inode_0], tail_watcher_io_handler_opened_statuses: [false, false, false], position_entries: [ - ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], + # The recorded path is old, but it is no problem. The path is not used when using follow_inodes. + ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_0], ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], ], }, @@ -2861,8 +2854,9 @@ def test_updateTW_after_refreshTW # This overwrites `@tails["tail.txt"]` d.instance.refresh_watchers - # `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher: + # `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` trys to update the TailWatcher: # TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1) + # However, it is already added in `refresh_watcher`, so `update_watcher` doesn't create the new TailWatcher. # The old TailWathcer is detached here since `rotate_wait` is just `1s`. sleep 3 @@ -2886,22 +2880,15 @@ def test_updateTW_after_refreshTW assert_equal( { - # TODO: This is BUG!! We need to fix it and replace this with the next. - record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"], - # record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"], + record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"], tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"], tail_watcher_inodes: [inode_0, inode_1, inode_0], tail_watcher_io_handler_opened_statuses: [false, false, false], - # TODO: This is BUG!! We need to fix it and replace this with the next. position_entries: [ - ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], + # The recorded path is old, but it is no problem. The path is not used when using follow_inodes. + ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_0], ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], - ["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0], ], - # position_entries: [ - # ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], - # ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], - # ], }, { record_values: record_values,