Skip to content

Commit

Permalink
Restart without downtime
Browse files Browse the repository at this point in the history
Add a new feature: Update/Reload without downtime.

1. The supervisor receives SIGUSR2.
2. Spawn a new supervisor.
3. Take over shared sockets.
4. Launch new workers, and stop old processes in parallel.
   * Launch new workers with source-only mode
     * Limit to restart_without_downtime_ready? input plugin
   * Send SIGTERM to the old supervisor after 10s delay from 3.
5. The old supervisor stops and sends SIGRTMIN(34) to the new one.
6. The new workers run fully.

Problem to solve:

Updating Fluentd or reloading a config causes downtime.
Plugins that receive data as a server, such as `in_udp`, `in_tcp`,
and `in_syslog`, cannot receive data during this time.
This means that the data sent by a client is lost during this
time unless the client has a re-sending feature.
This makes updating Fluentd or reloading a config difficult in
some cases.

Note: need these feature

* #4661
* treasure-data/serverengine#146

Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom committed Oct 11, 2024
1 parent 7ab49c3 commit 630f809
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 27 deletions.
4 changes: 2 additions & 2 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def initialize

attr_reader :root_agent, :system_config, :supervisor_mode

def init(system_config, supervisor_mode: false)
def init(system_config, supervisor_mode: false, start_in_parallel: false)
@system_config = system_config
@supervisor_mode = supervisor_mode

Expand All @@ -58,7 +58,7 @@ def init(system_config, supervisor_mode: false)

@log_event_verbose = system_config.log_event_verbose unless system_config.log_event_verbose.nil?

@root_agent = RootAgent.new(log: log, system_config: @system_config)
@root_agent = RootAgent.new(log: log, system_config: @system_config, start_in_parallel: start_in_parallel)

self
end
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_syslog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ def multi_workers_ready?
true
end

def restart_without_downtime_ready?
true
end

def start
super

Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ def multi_workers_ready?
true
end

def restart_without_downtime_ready?
true
end

def start
super

Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_udp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ def multi_workers_ready?
true
end

def restart_without_downtime_ready?
true
end

def start
super

Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/input.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ def metric_callback(es)
def multi_workers_ready?
false
end

def restart_without_downtime_ready?
false
end
end
end
end
47 changes: 39 additions & 8 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,43 @@ module Fluent
class RootAgent < Agent
ERROR_LABEL = "@ERROR".freeze # @ERROR is built-in error label

def initialize(log:, system_config: SystemConfig.new)
class SourceOnlyMode
DISABELD = 0
NORMAL = 1
RESTART_WITHOUT_DOWNTIME_READY_ONLY = 2

def initialize(with_source_only, start_in_parallel)
if start_in_parallel
@mode = RESTART_WITHOUT_DOWNTIME_READY_ONLY
elsif with_source_only
@mode = NORMAL
else
@mode = DISABELD
end
end

def source_only?
@mode != DISABELD
end

def restart_without_downtime_ready_only?
@mode == RESTART_WITHOUT_DOWNTIME_READY_ONLY
end

def disable!
@mode = DISABELD
end
end

def initialize(log:, system_config: SystemConfig.new, start_in_parallel: false)
super(log: log)

@labels = {}
@inputs = []
@suppress_emit_error_log_interval = 0
@next_emit_error_log_time = nil
@without_source = system_config.without_source || false
@with_source_only = system_config.with_source_only || false
@source_only_mode = SourceOnlyMode.new(system_config.with_source_only, start_in_parallel)
@source_only_buffer_agent = nil
@enable_input_metrics = system_config.enable_input_metrics || false

Expand All @@ -67,7 +95,7 @@ def initialize(log:, system_config: SystemConfig.new)
attr_reader :labels

def source_only_router
raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @with_source_only
raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @source_only_mode.source_only?
@source_only_buffer_agent.event_router
end

Expand Down Expand Up @@ -154,7 +182,7 @@ def configure(conf)

super

setup_source_only_buffer_agent if @with_source_only
setup_source_only_buffer_agent if @source_only_mode.source_only?

# initialize <source> elements
if @without_source
Expand Down Expand Up @@ -184,7 +212,7 @@ def setup_source_only_buffer_agent(flush: false)

def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil)
unless kind_or_agent_list
if @with_source_only
if @source_only_mode.source_only?
kind_or_agent_list = [:input, @source_only_buffer_agent]
elsif @source_only_buffer_agent
# source_only_buffer_agent can re-reroute events, so the priority is equal to output_with_router.
Expand All @@ -210,6 +238,9 @@ def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil)
end
display_kind = (kind == :output_with_router ? :output : kind)
list.each do |instance|
if @source_only_mode.restart_without_downtime_ready_only?
next unless instance.restart_without_downtime_ready?
end
yield instance, display_kind
end
end
Expand Down Expand Up @@ -254,9 +285,9 @@ def flush!

def cancel_source_only!
# TODO exclusive lock
if @with_source_only
if @source_only_mode.source_only?
log.info "cancel --with-source-only mode and start the other plugins"
@with_source_only = false
@source_only_mode.disable!
start

lifecycle_control_list[:input].each(&:event_emitter_cancel_source_only)
Expand Down Expand Up @@ -371,7 +402,7 @@ def add_source(type, conf)
# See also 'fluentd/plugin/input.rb'
input.context_router = @event_router
input.configure(conf)
input.event_emitter_set_source_only if @with_source_only
input.event_emitter_set_source_only if @source_only_mode.source_only?
if @enable_input_metrics
@event_router.add_metric_callbacks(input.plugin_id, Proc.new {|es| input.metric_callback(es) })
end
Expand Down
127 changes: 110 additions & 17 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ def before_run
@rpc_endpoint = nil
@rpc_server = nil
@counter = nil
@socket_manager_server = nil
@starting_new_supervisor_without_downtime = false
@new_supervisor_pid = nil
start_in_parallel = ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")

@fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-")
ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir
Expand All @@ -65,18 +69,31 @@ def before_run

if config[:disable_shared_socket]
$log.info "shared socket for multiple workers is disabled"
elsif start_in_parallel
begin
raise "[BUG] SERVERENGINE_SOCKETMANAGER_PATH env var must exist when starting in parallel" unless ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH')
@socket_manager_server = ServerEngine::SocketManager::Server.take_over_another_server(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
$log.info "restart-without-downtime: took over the shared sockets", path: ENV['SERVERENGINE_SOCKETMANAGER_PATH']
rescue => e
$log.error "restart-without-downtime: cancel sequence because failed to take over the shared sockets", error: e
raise
end
else
server = ServerEngine::SocketManager::Server.open
ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s
@socket_manager_server = ServerEngine::SocketManager::Server.open
ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_server.path.to_s
end

stop_parallel_old_supervisor_after_delay if start_in_parallel
end

def after_run
stop_windows_event_thread if Fluent.windows?
stop_rpc_server if @rpc_endpoint
stop_counter_server if @counter
cleanup_lock_dir
Fluent::Supervisor.cleanup_resources
Fluent::Supervisor.cleanup_socketmanager_path unless @starting_new_supervisor_without_downtime

notify_new_supervisor_that_old_one_has_stopped if @starting_new_supervisor_without_downtime
end

def cleanup_lock_dir
Expand Down Expand Up @@ -138,7 +155,7 @@ def run_rpc_server
@rpc_server.mount_proc('/api/config.gracefulReload') { |req, res|
$log.debug "fluentd RPC got /api/config.gracefulReload request"
if Fluent.windows?
supervisor_sigusr2_handler
graceful_reload
else
Process.kill :USR2, Process.pid
end
Expand Down Expand Up @@ -172,6 +189,47 @@ def stop_counter_server
@counter.stop
end

def stop_parallel_old_supervisor_after_delay
# TODO if the new supervisor fails to start and this is not called,
# it would be necessary to update the pid in the PID file to the old one when daemonized.

Thread.new do
# Delay to avoid worker downtime as much as possible.
# Even if the downtime occurs, it is no problem because the socket buffer works,
# as long as the capacity is not exceeded.
sleep 10
old_pid = ENV["FLUENT_RUNNING_IN_PARALLEL_WITH_OLD"]&.to_i
if old_pid
$log.info "restart-without-downtime: stop the old supervisor"
Process.kill :TERM, old_pid
end
rescue => e
$log.warn "restart-without-downtime: failed to stop the old supervisor." +
" If the old one does not exist, please send '34' signal to this new process to start to work fully." +
" If it exists, something went wrong. Please kill the old one manually.",
error: e
end
end

def notify_new_supervisor_that_old_one_has_stopped
if config[:pid_path]
new_pid = File.read(config[:pid_path]).to_i
else
raise "[BUG] new_supervisor_pid is not saved" unless @new_supervisor_pid
new_pid = @new_supervisor_pid
end

$log.info "restart-without-downtime: notify the new supervisor (pid: #{new_pid}) that old one has stopped"
Process.kill 34, new_pid
rescue => e
$log.error(
"restart-without-downtime: failed to notify the new supervisor." +
" Please send '34' signal to the new supervisor process manually" +
" if it does not start to work fully.",
error: e
)
end

def install_supervisor_signal_handlers
return if Fluent.windows?

Expand All @@ -187,7 +245,11 @@ def install_supervisor_signal_handlers

trap :USR2 do
$log.debug 'fluentd supervisor process got SIGUSR2'
supervisor_sigusr2_handler
if Fluent.windows?
graceful_reload
else
restart_without_downtime
end
end

trap 34 do
Expand Down Expand Up @@ -259,7 +321,7 @@ def install_windows_event_handler
when :usr1
supervisor_sigusr1_handler
when :usr2
supervisor_sigusr2_handler
graceful_reload
when :cont
supervisor_dump_handler_for_windows
when :stop_event_thread
Expand Down Expand Up @@ -289,7 +351,7 @@ def supervisor_sigusr1_handler
send_signal_to_workers(:USR1)
end

def supervisor_sigusr2_handler
def graceful_reload
conf = nil
t = Thread.new do
$log.info 'Reloading new config'
Expand Down Expand Up @@ -317,7 +379,38 @@ def supervisor_sigusr2_handler
$log.error "Failed to reload config file: #{e}"
end

def restart_without_downtime
# TODO exclusive lock

$log.info "start restart-without-downtime sequence"

if @starting_new_supervisor_without_downtime
$log.warn "restart-without-downtime: canceled because it is already starting"
return
end
if ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")
$log.warn "restart-without-downtime: canceled because the previous sequence is still running"
return
end

@starting_new_supervisor_without_downtime = true
commands = [ServerEngine.ruby_bin_path, $0] + ARGV
env_to_add = {
"SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN" => ServerEngine::SocketManager::INTERNAL_TOKEN,
"FLUENT_RUNNING_IN_PARALLEL_WITH_OLD" => "#{Process.pid}",
}
pid = Process.spawn(env_to_add, commands.join(" "))
@new_supervisor_pid = pid unless config[:daemonize]
rescue => e
$log.error "restart-without-downtime: failed", error: e
@starting_new_supervisor_without_downtime = false
end

def cancel_source_only
if ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")
$log.info "restart-without-downtime: done all sequences, now the new workers starts to work fully"
ENV.delete("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")
end
send_signal_to_workers(34)
end

Expand Down Expand Up @@ -509,12 +602,11 @@ def self.default_options
}
end

def self.cleanup_resources
unless Fluent.windows?
if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH')
FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
end
end
def self.cleanup_socketmanager_path
return if Fluent.windows?
return unless ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH')

FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
end

def initialize(cl_opt)
Expand Down Expand Up @@ -578,7 +670,7 @@ def run_supervisor(dry_run: false)
begin
ServerEngine::Privilege.change(@chuser, @chgroup)
MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support)
Fluent::Engine.init(@system_config, supervisor_mode: true)
Fluent::Engine.init(@system_config, supervisor_mode: true, start_in_parallel: ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD"))
Fluent::Engine.run_configure(@conf, dry_run: dry_run)
rescue Fluent::ConfigError => e
$log.error 'config error', file: @config_path, error: e
Expand Down Expand Up @@ -623,10 +715,10 @@ def run_worker
File.umask(@chumask.to_i(8))
end
MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support)
Fluent::Engine.init(@system_config)
Fluent::Engine.init(@system_config, start_in_parallel: ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD"))
Fluent::Engine.run_configure(@conf)
Fluent::Engine.run
self.class.cleanup_resources if @standalone_worker
self.class.cleanup_socketmanager_path if @standalone_worker
exit 0
end
end
Expand Down Expand Up @@ -844,7 +936,8 @@ def install_main_process_signal_handlers
end

trap :USR2 do
reload_config
# Do nothing
# TODO consider suitable code for this
end

trap :CONT do
Expand Down

0 comments on commit 630f809

Please sign in to comment.