From 7ab49c3f7209adcdbffcd51b053e4d4cf60add0a Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Thu, 10 Oct 2024 23:33:42 +0900 Subject: [PATCH 1/2] fluentd command: add --with-source-only It launches Fluentd with Input plugins only. Here is the specification. * Those Input plugins emits the events to SourceOnlyBufferAgent. * The events is kept in the buf_file during the source-only-mode. * SIGRTMIN(34) cancels the mode. * After canceled, the new agent starts to load the buffer. Signed-off-by: Daijiro Fukuda --- lib/fluent/command/fluentd.rb | 4 + lib/fluent/engine.rb | 4 + lib/fluent/plugin/buf_file.rb | 2 + lib/fluent/plugin/out_buffer.rb | 78 ++++++++++++++++++++ lib/fluent/plugin/output.rb | 1 + lib/fluent/plugin_helper/event_emitter.rb | 12 +++ lib/fluent/root_agent.rb | 90 ++++++++++++++++++----- lib/fluent/source_only_buffer_agent.rb | 47 ++++++++++++ lib/fluent/supervisor.rb | 24 ++++++ lib/fluent/system_config.rb | 5 +- 10 files changed, 247 insertions(+), 20 deletions(-) create mode 100644 lib/fluent/plugin/out_buffer.rb create mode 100644 lib/fluent/source_only_buffer_agent.rb diff --git a/lib/fluent/command/fluentd.rb b/lib/fluent/command/fluentd.rb index da25b74cca..5e4cff55e9 100644 --- a/lib/fluent/command/fluentd.rb +++ b/lib/fluent/command/fluentd.rb @@ -127,6 +127,10 @@ cmd_opts[:without_source] = b } +op.on('--with-source-only', "invoke a fluentd only with input plugins", TrueClass) {|b| + cmd_opts[:with_source_only] = b +} + op.on('--config-file-type VALU', 'guessing file type of fluentd configuration. yaml/yml or guess') { |s| if (s == 'yaml') || (s == 'yml') cmd_opts[:config_file_type] = s.to_sym diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index afac3167ca..65db5c7a39 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -136,6 +136,10 @@ def flush! @root_agent.flush! end + def cancel_source_only! + @root_agent.cancel_source_only! + end + def now # TODO thread update Fluent::EventTime.now diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb index 7f11478a01..e36d2a1dad 100644 --- a/lib/fluent/plugin/buf_file.rb +++ b/lib/fluent/plugin/buf_file.rb @@ -42,6 +42,8 @@ class FileBuffer < Fluent::Plugin::Buffer config_param :file_permission, :string, default: nil # '0644' (Fluent::DEFAULT_FILE_PERMISSION) config_param :dir_permission, :string, default: nil # '0755' (Fluent::DEFAULT_DIR_PERMISSION) + attr_reader :buffer_path + def initialize super @symlink_path = nil diff --git a/lib/fluent/plugin/out_buffer.rb b/lib/fluent/plugin/out_buffer.rb new file mode 100644 index 0000000000..e72b718084 --- /dev/null +++ b/lib/fluent/plugin/out_buffer.rb @@ -0,0 +1,78 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin/output' + +module Fluent::Plugin + class BufferOutput < Output + Fluent::Plugin.register_output("buffer", self) + helpers :event_emitter + + desc "Try to remove the buffer directory after terminate." + + " Mainly for the --with-source-only feature." + config_param :cleanup_after_shutdown, :bool, default: false + + config_section :buffer do + config_set_default :@type, "file" + config_set_default :chunk_keys, ["tag"] + config_set_default :flush_mode, :interval + config_set_default :flush_interval, 10 + end + + def multi_workers_ready? + true + end + + def initialize + super + + @buffer_path = nil + end + + def configure(conf) + super + + raise Fluent::ConfigError, "The buffer plugin does not be supported. It must have 'buffer_path' getter. You can use 'file' buffer." unless @buffer.respond_to?(:buffer_path) + + @buffer_path = @buffer.buffer_path + end + + def write(chunk) + return if chunk.empty? + router.emit_stream(chunk.metadata.tag, Fluent::MessagePackEventStream.new(chunk.read)) + end + + def terminate + super + + return unless cleanup_after_shutdown + + unless Dir.empty?(@buffer_path) + if @buffer_config.flush_at_shutdown + log.warn "some buffer files remain in #{@buffer_path}, so cancel cleanup the directory." + + " Please consider saving or recovering the buffer files in the directory." + end + return + end + + begin + FileUtils.remove_dir(@buffer_path) + rescue => e + log.warn "failed to remove the empty buffer dirctory: #{@buffer_path}", error: e + end + end + end +end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 0aed67db95..8aa4fc8522 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1384,6 +1384,7 @@ def retry_state(randomize) end def submit_flush_once + return unless @buffer_config.flush_thread_count > 0 # Without locks: it is rough but enough to select "next" writer selection @output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count state = @output_flush_threads[@output_flush_thread_current_position] diff --git a/lib/fluent/plugin_helper/event_emitter.rb b/lib/fluent/plugin_helper/event_emitter.rb index ba089e485a..60d34391b7 100644 --- a/lib/fluent/plugin_helper/event_emitter.rb +++ b/lib/fluent/plugin_helper/event_emitter.rb @@ -26,6 +26,9 @@ module EventEmitter def router @_event_emitter_used_actually = true + + return Engine.root_agent.source_only_router if @_event_emitter_force_source_only_router + if @_event_emitter_lazy_init @router = @primary_instance.router end @@ -48,6 +51,14 @@ def event_emitter_used_actually? @_event_emitter_used_actually end + def event_emitter_set_source_only + @_event_emitter_force_source_only_router = true + end + + def event_emitter_cancel_source_only + @_event_emitter_force_source_only_router = false + end + def event_emitter_router(label_name) if label_name if label_name == "@ROOT" @@ -72,6 +83,7 @@ def initialize super @_event_emitter_used_actually = false @_event_emitter_lazy_init = false + @_event_emitter_force_source_only_router = false @router = nil end diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index d10f366044..8411152da4 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -22,6 +22,7 @@ require 'fluent/plugin' require 'fluent/system_config' require 'fluent/time' +require 'fluent/source_only_buffer_agent' module Fluent # @@ -54,17 +55,22 @@ def initialize(log:, system_config: SystemConfig.new) @inputs = [] @suppress_emit_error_log_interval = 0 @next_emit_error_log_time = nil - @without_source = false - @enable_input_metrics = false + @without_source = system_config.without_source || false + @with_source_only = system_config.with_source_only || false + @source_only_buffer_agent = nil + @enable_input_metrics = system_config.enable_input_metrics || false suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil? - @without_source = system_config.without_source unless system_config.without_source.nil? - @enable_input_metrics = !!system_config.enable_input_metrics end attr_reader :inputs attr_reader :labels + def source_only_router + raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @with_source_only + @source_only_buffer_agent.event_router + end + def configure(conf) used_worker_ids = [] available_worker_ids = (0..Fluent::Engine.system_config.workers - 1).to_a @@ -148,6 +154,8 @@ def configure(conf) super + setup_source_only_buffer_agent if @with_source_only + # initialize elements if @without_source log.info :worker0, "'--without-source' is applied. Ignore sections" @@ -169,16 +177,29 @@ def setup_error_label(e) @error_collector = error_label.event_router end - def lifecycle(desc: false, kind_callback: nil) - kind_or_label_list = if desc - [:output, :filter, @labels.values.reverse, :output_with_router, :input].flatten - else - [:input, :output_with_router, @labels.values, :filter, :output].flatten - end - kind_or_label_list.each do |kind| + def setup_source_only_buffer_agent(flush: false) + @source_only_buffer_agent = SourceOnlyBufferAgent.new(log: log, system_config: Fluent::Engine.system_config) + @source_only_buffer_agent.configure(flush: flush) + end + + def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil) + unless kind_or_agent_list + if @with_source_only + kind_or_agent_list = [:input, @source_only_buffer_agent] + elsif @source_only_buffer_agent + # source_only_buffer_agent can re-reroute events, so the priority is equal to output_with_router. + kind_or_agent_list = [:input, :output_with_router, @source_only_buffer_agent, @labels.values, :filter, :output].flatten + else + kind_or_agent_list = [:input, :output_with_router, @labels.values, :filter, :output].flatten + end + + kind_or_agent_list.reverse! if desc + end + + kind_or_agent_list.each do |kind| if kind.respond_to?(:lifecycle) - label = kind - label.lifecycle(desc: desc) do |plugin, display_kind| + agent = kind + agent.lifecycle(desc: desc) do |plugin, display_kind| yield plugin, display_kind end else @@ -198,8 +219,8 @@ def lifecycle(desc: false, kind_callback: nil) end end - def start - lifecycle(desc: true) do |i| # instance + def start(kind_or_agent_list: nil) + lifecycle(desc: true, kind_or_agent_list: kind_or_agent_list) do |i| # instance i.start unless i.started? # Input#start sometimes emits lots of events with in_tail/`read_from_head true` case # and it causes deadlock for small buffer/queue output. To avoid such problem, @@ -231,13 +252,45 @@ def flush! flushing_threads.each{|t| t.join } end - def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins + def cancel_source_only! + # TODO exclusive lock + if @with_source_only + log.info "cancel --with-source-only mode and start the other plugins" + @with_source_only = false + start + + lifecycle_control_list[:input].each(&:event_emitter_cancel_source_only) + + # Want to make sure that the source_only_router finishes all process. + # Strictly speaking, we must make a forwarding feature for the source_only_router + # to make sure it. + # However, it would need exclusive lock for EventRouter and worsen its performance. + # So, just sleep a few seconds here. + sleep 5 + + shutdown(kind_or_agent_list: [@source_only_buffer_agent]) + @source_only_buffer_agent = nil + end + + if @source_only_buffer_agent + log.info "do nothing for canceling --with-source-only because it is already canceled, and the loading agent already exists" + return + end + + # TODO This agent should stop after flushing its all buffer. + log.info "starts the loading agent for --with-source-only" + setup_source_only_buffer_agent(flush: true) + start(kind_or_agent_list: [@source_only_buffer_agent]) + end + + def shutdown(kind_or_agent_list: nil) + # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins # These method callers does `rescue Exception` to call methods of shutdown sequence as far as possible # if plugin methods does something like infinite recursive call, `exit`, unregistering signal handlers or others. # Plugins should be separated and be in sandbox to protect data in each plugins/buffers. lifecycle_safe_sequence = ->(method, checker) { - lifecycle do |instance, kind| + lifecycle(kind_or_agent_list: kind_or_agent_list) do |instance, kind| begin log.debug "calling #{method} on #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id instance.__send__(method) unless instance.__send__(checker) @@ -260,7 +313,7 @@ def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, a operation_threads.each{|t| t.join } operation_threads.clear } - lifecycle(kind_callback: callback) do |instance, kind| + lifecycle(kind_callback: callback, kind_or_agent_list: kind_or_agent_list) do |instance, kind| t = Thread.new do Thread.current.abort_on_exception = true begin @@ -318,6 +371,7 @@ def add_source(type, conf) # See also 'fluentd/plugin/input.rb' input.context_router = @event_router input.configure(conf) + input.event_emitter_set_source_only if @with_source_only if @enable_input_metrics @event_router.add_metric_callbacks(input.plugin_id, Proc.new {|es| input.metric_callback(es) }) end diff --git a/lib/fluent/source_only_buffer_agent.rb b/lib/fluent/source_only_buffer_agent.rb new file mode 100644 index 0000000000..3e31082d40 --- /dev/null +++ b/lib/fluent/source_only_buffer_agent.rb @@ -0,0 +1,47 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'securerandom' + +require 'fluent/agent' +require 'fluent/system_config' + +module Fluent + class SourceOnlyBufferAgent < Agent + BUFFER_DIR_NAME = SecureRandom.uuid + + def initialize(log:, system_config:) + super(log: log) + + @buffer_path = File.join(system_config.root_dir || DEFAULT_BACKUP_DIR, 'source-only-buffer', BUFFER_DIR_NAME) + end + + def configure(flush: false) + super( + Config::Element.new('SOURCE_ONLY_BUFFER', '', {}, [ + Config::Element.new('match', '**', {'@type' => 'buffer', '@label' => '@ROOT', 'cleanup_after_shutdown' => 'true'}, [ + Config::Element.new('buffer', '', { + 'path' => @buffer_path, + 'flush_at_shutdown' => flush ? 'true' : 'false', + 'flush_thread_count' => flush ? 1 : 0, + 'overflow_action' => "drop_oldest_chunk", + }, []) + ]) + ]) + ) + end + end +end diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index d565abf600..c95ab125fa 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -189,6 +189,11 @@ def install_supervisor_signal_handlers $log.debug 'fluentd supervisor process got SIGUSR2' supervisor_sigusr2_handler end + + trap 34 do + $log.debug 'fluentd supervisor process got SIGRTMIN' + cancel_source_only + end end if Fluent.windows? @@ -312,6 +317,10 @@ def supervisor_sigusr2_handler $log.error "Failed to reload config file: #{e}" end + def cancel_source_only + send_signal_to_workers(34) + end + def supervisor_dump_handler_for_windows # As for UNIX-like, SIGCONT signal to each process makes the process output its dump-file, # and it is implemented before the implementation of the function for Windows. @@ -486,6 +495,7 @@ def self.default_options suppress_repeated_stacktrace: true, ignore_repeated_log_interval: nil, without_source: nil, + with_source_only: nil, enable_input_metrics: nil, enable_size_metrics: nil, use_v1_config: true, @@ -840,6 +850,10 @@ def install_main_process_signal_handlers trap :CONT do dump_non_windows end + + trap 34 do + cancel_source_only + end end end @@ -893,6 +907,16 @@ def flush_buffer end end + def cancel_source_only + Thread.new do + begin + Fluent::Engine.cancel_source_only! + rescue Exception => e + $log.warn "failed to cancel source only", error: e + end + end + end + def reload_config Thread.new do $log.debug('worker got SIGUSR2') diff --git a/lib/fluent/system_config.rb b/lib/fluent/system_config.rb index 917889018d..99e7e4742d 100644 --- a/lib/fluent/system_config.rb +++ b/lib/fluent/system_config.rb @@ -25,7 +25,7 @@ class SystemConfig :workers, :restart_worker_interval, :root_dir, :log_level, :suppress_repeated_stacktrace, :emit_error_log_interval, :suppress_config_dump, :log_event_verbose, :ignore_repeated_log_interval, :ignore_same_log_interval, - :without_source, :rpc_endpoint, :enable_get_dump, :process_name, + :without_source, :with_source_only, :rpc_endpoint, :enable_get_dump, :process_name, :file_permission, :dir_permission, :counter_server, :counter_client, :strict_config_value, :enable_msgpack_time_support, :disable_shared_socket, :metrics, :enable_input_metrics, :enable_size_metrics, :enable_jit @@ -41,7 +41,8 @@ class SystemConfig config_param :emit_error_log_interval, :time, default: nil config_param :suppress_config_dump, :bool, default: nil config_param :log_event_verbose, :bool, default: nil - config_param :without_source, :bool, default: nil + config_param :without_source, :bool, default: nil + config_param :with_source_only, :bool, default: nil config_param :rpc_endpoint, :string, default: nil config_param :enable_get_dump, :bool, default: nil config_param :process_name, :string, default: nil From 1cbbd9a81b8576374bcc32454e959c42075bb399 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Thu, 10 Oct 2024 23:26:42 +0900 Subject: [PATCH 2/2] Restart without downtime Add a new feature: Update/Reload without downtime. 1. The supervisor receives SIGUSR2. 2. Spawn a new supervisor. 3. Take over shared sockets. 4. Launch new workers, and stop old processes in parallel. * Launch new workers with source-only mode * Limit to restart_without_downtime_ready? input plugin * Send SIGTERM to the old supervisor after 10s delay from 3. 5. The old supervisor stops and sends SIGRTMIN(34) to the new one. 6. The new workers run fully. Problem to solve: Updating Fluentd or reloading a config causes downtime. Plugins that receive data as a server, such as `in_udp`, `in_tcp`, and `in_syslog`, cannot receive data during this time. This means that the data sent by a client is lost during this time unless the client has a re-sending feature. This makes updating Fluentd or reloading a config difficult in some cases. Note: need these feature * https://github.com/fluent/fluentd/pull/4661 * https://github.com/treasure-data/serverengine/pull/146 Co-authored-by: Shizuo Fujita Signed-off-by: Daijiro Fukuda --- lib/fluent/engine.rb | 4 +- lib/fluent/plugin/in_syslog.rb | 4 ++ lib/fluent/plugin/in_tcp.rb | 4 ++ lib/fluent/plugin/in_udp.rb | 4 ++ lib/fluent/plugin/input.rb | 4 ++ lib/fluent/root_agent.rb | 47 +++++++++--- lib/fluent/supervisor.rb | 127 ++++++++++++++++++++++++++++----- 7 files changed, 167 insertions(+), 27 deletions(-) diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index 65db5c7a39..94d09aa334 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -49,7 +49,7 @@ def initialize attr_reader :root_agent, :system_config, :supervisor_mode - def init(system_config, supervisor_mode: false) + def init(system_config, supervisor_mode: false, start_in_parallel: false) @system_config = system_config @supervisor_mode = supervisor_mode @@ -58,7 +58,7 @@ def init(system_config, supervisor_mode: false) @log_event_verbose = system_config.log_event_verbose unless system_config.log_event_verbose.nil? - @root_agent = RootAgent.new(log: log, system_config: @system_config) + @root_agent = RootAgent.new(log: log, system_config: @system_config, start_in_parallel: start_in_parallel) self end diff --git a/lib/fluent/plugin/in_syslog.rb b/lib/fluent/plugin/in_syslog.rb index 28ad1c5dd9..b33a479aee 100644 --- a/lib/fluent/plugin/in_syslog.rb +++ b/lib/fluent/plugin/in_syslog.rb @@ -156,6 +156,10 @@ def multi_workers_ready? true end + def restart_without_downtime_ready? + true + end + def start super diff --git a/lib/fluent/plugin/in_tcp.rb b/lib/fluent/plugin/in_tcp.rb index bd2ea83e5b..224eecb043 100644 --- a/lib/fluent/plugin/in_tcp.rb +++ b/lib/fluent/plugin/in_tcp.rb @@ -101,6 +101,10 @@ def multi_workers_ready? true end + def restart_without_downtime_ready? + true + end + def start super diff --git a/lib/fluent/plugin/in_udp.rb b/lib/fluent/plugin/in_udp.rb index c2d436115f..006524a95c 100644 --- a/lib/fluent/plugin/in_udp.rb +++ b/lib/fluent/plugin/in_udp.rb @@ -65,6 +65,10 @@ def multi_workers_ready? true end + def restart_without_downtime_ready? + true + end + def start super diff --git a/lib/fluent/plugin/input.rb b/lib/fluent/plugin/input.rb index 7a6909f7a9..7fb9b3608a 100644 --- a/lib/fluent/plugin/input.rb +++ b/lib/fluent/plugin/input.rb @@ -70,6 +70,10 @@ def metric_callback(es) def multi_workers_ready? false end + + def restart_without_downtime_ready? + false + end end end end diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index 8411152da4..f5870feaa2 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -48,7 +48,35 @@ module Fluent class RootAgent < Agent ERROR_LABEL = "@ERROR".freeze # @ERROR is built-in error label - def initialize(log:, system_config: SystemConfig.new) + class SourceOnlyMode + DISABELD = 0 + NORMAL = 1 + RESTART_WITHOUT_DOWNTIME_READY_ONLY = 2 + + def initialize(with_source_only, start_in_parallel) + if start_in_parallel + @mode = RESTART_WITHOUT_DOWNTIME_READY_ONLY + elsif with_source_only + @mode = NORMAL + else + @mode = DISABELD + end + end + + def source_only? + @mode != DISABELD + end + + def restart_without_downtime_ready_only? + @mode == RESTART_WITHOUT_DOWNTIME_READY_ONLY + end + + def disable! + @mode = DISABELD + end + end + + def initialize(log:, system_config: SystemConfig.new, start_in_parallel: false) super(log: log) @labels = {} @@ -56,7 +84,7 @@ def initialize(log:, system_config: SystemConfig.new) @suppress_emit_error_log_interval = 0 @next_emit_error_log_time = nil @without_source = system_config.without_source || false - @with_source_only = system_config.with_source_only || false + @source_only_mode = SourceOnlyMode.new(system_config.with_source_only, start_in_parallel) @source_only_buffer_agent = nil @enable_input_metrics = system_config.enable_input_metrics || false @@ -67,7 +95,7 @@ def initialize(log:, system_config: SystemConfig.new) attr_reader :labels def source_only_router - raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @with_source_only + raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @source_only_mode.source_only? @source_only_buffer_agent.event_router end @@ -154,7 +182,7 @@ def configure(conf) super - setup_source_only_buffer_agent if @with_source_only + setup_source_only_buffer_agent if @source_only_mode.source_only? # initialize elements if @without_source @@ -184,7 +212,7 @@ def setup_source_only_buffer_agent(flush: false) def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil) unless kind_or_agent_list - if @with_source_only + if @source_only_mode.source_only? kind_or_agent_list = [:input, @source_only_buffer_agent] elsif @source_only_buffer_agent # source_only_buffer_agent can re-reroute events, so the priority is equal to output_with_router. @@ -210,6 +238,9 @@ def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil) end display_kind = (kind == :output_with_router ? :output : kind) list.each do |instance| + if @source_only_mode.restart_without_downtime_ready_only? + next unless instance.restart_without_downtime_ready? + end yield instance, display_kind end end @@ -254,9 +285,9 @@ def flush! def cancel_source_only! # TODO exclusive lock - if @with_source_only + if @source_only_mode.source_only? log.info "cancel --with-source-only mode and start the other plugins" - @with_source_only = false + @source_only_mode.disable! start lifecycle_control_list[:input].each(&:event_emitter_cancel_source_only) @@ -371,7 +402,7 @@ def add_source(type, conf) # See also 'fluentd/plugin/input.rb' input.context_router = @event_router input.configure(conf) - input.event_emitter_set_source_only if @with_source_only + input.event_emitter_set_source_only if @source_only_mode.source_only? if @enable_input_metrics @event_router.add_metric_callbacks(input.plugin_id, Proc.new {|es| input.metric_callback(es) }) end diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index c95ab125fa..851030e0e5 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -43,6 +43,10 @@ def before_run @rpc_endpoint = nil @rpc_server = nil @counter = nil + @socket_manager_server = nil + @starting_new_supervisor_without_downtime = false + @new_supervisor_pid = nil + start_in_parallel = ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") @fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-") ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir @@ -65,10 +69,21 @@ def before_run if config[:disable_shared_socket] $log.info "shared socket for multiple workers is disabled" + elsif start_in_parallel + begin + raise "[BUG] SERVERENGINE_SOCKETMANAGER_PATH env var must exist when starting in parallel" unless ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH') + @socket_manager_server = ServerEngine::SocketManager::Server.take_over_another_server(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) + $log.info "restart-without-downtime: took over the shared sockets", path: ENV['SERVERENGINE_SOCKETMANAGER_PATH'] + rescue => e + $log.error "restart-without-downtime: cancel sequence because failed to take over the shared sockets", error: e + raise + end else - server = ServerEngine::SocketManager::Server.open - ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s + @socket_manager_server = ServerEngine::SocketManager::Server.open + ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_server.path.to_s end + + stop_parallel_old_supervisor_after_delay if start_in_parallel end def after_run @@ -76,7 +91,9 @@ def after_run stop_rpc_server if @rpc_endpoint stop_counter_server if @counter cleanup_lock_dir - Fluent::Supervisor.cleanup_resources + Fluent::Supervisor.cleanup_socketmanager_path unless @starting_new_supervisor_without_downtime + + notify_new_supervisor_that_old_one_has_stopped if @starting_new_supervisor_without_downtime end def cleanup_lock_dir @@ -138,7 +155,7 @@ def run_rpc_server @rpc_server.mount_proc('/api/config.gracefulReload') { |req, res| $log.debug "fluentd RPC got /api/config.gracefulReload request" if Fluent.windows? - supervisor_sigusr2_handler + graceful_reload else Process.kill :USR2, Process.pid end @@ -172,6 +189,47 @@ def stop_counter_server @counter.stop end + def stop_parallel_old_supervisor_after_delay + # TODO if the new supervisor fails to start and this is not called, + # it would be necessary to update the pid in the PID file to the old one when daemonized. + + Thread.new do + # Delay to avoid worker downtime as much as possible. + # Even if the downtime occurs, it is no problem because the socket buffer works, + # as long as the capacity is not exceeded. + sleep 10 + old_pid = ENV["FLUENT_RUNNING_IN_PARALLEL_WITH_OLD"]&.to_i + if old_pid + $log.info "restart-without-downtime: stop the old supervisor" + Process.kill :TERM, old_pid + end + rescue => e + $log.warn "restart-without-downtime: failed to stop the old supervisor." + + " If the old one does not exist, please send '34' signal to this new process to start to work fully." + + " If it exists, something went wrong. Please kill the old one manually.", + error: e + end + end + + def notify_new_supervisor_that_old_one_has_stopped + if config[:pid_path] + new_pid = File.read(config[:pid_path]).to_i + else + raise "[BUG] new_supervisor_pid is not saved" unless @new_supervisor_pid + new_pid = @new_supervisor_pid + end + + $log.info "restart-without-downtime: notify the new supervisor (pid: #{new_pid}) that old one has stopped" + Process.kill 34, new_pid + rescue => e + $log.error( + "restart-without-downtime: failed to notify the new supervisor." + + " Please send '34' signal to the new supervisor process manually" + + " if it does not start to work fully.", + error: e + ) + end + def install_supervisor_signal_handlers return if Fluent.windows? @@ -187,7 +245,11 @@ def install_supervisor_signal_handlers trap :USR2 do $log.debug 'fluentd supervisor process got SIGUSR2' - supervisor_sigusr2_handler + if Fluent.windows? + graceful_reload + else + restart_without_downtime + end end trap 34 do @@ -259,7 +321,7 @@ def install_windows_event_handler when :usr1 supervisor_sigusr1_handler when :usr2 - supervisor_sigusr2_handler + graceful_reload when :cont supervisor_dump_handler_for_windows when :stop_event_thread @@ -289,7 +351,7 @@ def supervisor_sigusr1_handler send_signal_to_workers(:USR1) end - def supervisor_sigusr2_handler + def graceful_reload conf = nil t = Thread.new do $log.info 'Reloading new config' @@ -317,7 +379,38 @@ def supervisor_sigusr2_handler $log.error "Failed to reload config file: #{e}" end + def restart_without_downtime + # TODO exclusive lock + + $log.info "start restart-without-downtime sequence" + + if @starting_new_supervisor_without_downtime + $log.warn "restart-without-downtime: canceled because it is already starting" + return + end + if ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") + $log.warn "restart-without-downtime: canceled because the previous sequence is still running" + return + end + + @starting_new_supervisor_without_downtime = true + commands = [ServerEngine.ruby_bin_path, $0] + ARGV + env_to_add = { + "SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN" => ServerEngine::SocketManager::INTERNAL_TOKEN, + "FLUENT_RUNNING_IN_PARALLEL_WITH_OLD" => "#{Process.pid}", + } + pid = Process.spawn(env_to_add, commands.join(" ")) + @new_supervisor_pid = pid unless config[:daemonize] + rescue => e + $log.error "restart-without-downtime: failed", error: e + @starting_new_supervisor_without_downtime = false + end + def cancel_source_only + if ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") + $log.info "restart-without-downtime: done all sequences, now the new workers starts to work fully" + ENV.delete("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") + end send_signal_to_workers(34) end @@ -509,12 +602,11 @@ def self.default_options } end - def self.cleanup_resources - unless Fluent.windows? - if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH') - FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) - end - end + def self.cleanup_socketmanager_path + return if Fluent.windows? + return unless ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH') + + FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) end def initialize(cl_opt) @@ -578,7 +670,7 @@ def run_supervisor(dry_run: false) begin ServerEngine::Privilege.change(@chuser, @chgroup) MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support) - Fluent::Engine.init(@system_config, supervisor_mode: true) + Fluent::Engine.init(@system_config, supervisor_mode: true, start_in_parallel: ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")) Fluent::Engine.run_configure(@conf, dry_run: dry_run) rescue Fluent::ConfigError => e $log.error 'config error', file: @config_path, error: e @@ -623,10 +715,10 @@ def run_worker File.umask(@chumask.to_i(8)) end MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support) - Fluent::Engine.init(@system_config) + Fluent::Engine.init(@system_config, start_in_parallel: ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")) Fluent::Engine.run_configure(@conf) Fluent::Engine.run - self.class.cleanup_resources if @standalone_worker + self.class.cleanup_socketmanager_path if @standalone_worker exit 0 end end @@ -844,7 +936,8 @@ def install_main_process_signal_handlers end trap :USR2 do - reload_config + # Do nothing + # TODO consider suitable code for this end trap :CONT do