Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update/Reload without downtime #4624

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/fluent/command/fluentd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@
cmd_opts[:without_source] = b
}

op.on('--with-source-only', "invoke a fluentd only with input plugins", TrueClass) {|b|
cmd_opts[:with_source_only] = b
}

op.on('--config-file-type VALU', 'guessing file type of fluentd configuration. yaml/yml or guess') { |s|
if (s == 'yaml') || (s == 'yml')
cmd_opts[:config_file_type] = s.to_sym
Expand Down
8 changes: 6 additions & 2 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def initialize

attr_reader :root_agent, :system_config, :supervisor_mode

def init(system_config, supervisor_mode: false)
def init(system_config, supervisor_mode: false, start_in_parallel: false)
@system_config = system_config
@supervisor_mode = supervisor_mode

Expand All @@ -58,7 +58,7 @@ def init(system_config, supervisor_mode: false)

@log_event_verbose = system_config.log_event_verbose unless system_config.log_event_verbose.nil?

@root_agent = RootAgent.new(log: log, system_config: @system_config)
@root_agent = RootAgent.new(log: log, system_config: @system_config, start_in_parallel: start_in_parallel)

self
end
Expand Down Expand Up @@ -136,6 +136,10 @@ def flush!
@root_agent.flush!
end

def cancel_source_only!
@root_agent.cancel_source_only!
end

def now
# TODO thread update
Fluent::EventTime.now
Expand Down
2 changes: 2 additions & 0 deletions lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class FileBuffer < Fluent::Plugin::Buffer
config_param :file_permission, :string, default: nil # '0644' (Fluent::DEFAULT_FILE_PERMISSION)
config_param :dir_permission, :string, default: nil # '0755' (Fluent::DEFAULT_DIR_PERMISSION)

attr_reader :buffer_path

def initialize
super
@symlink_path = nil
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_syslog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ def multi_workers_ready?
true
end

def restart_without_downtime_ready?
true
end

def start
super

Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ def multi_workers_ready?
true
end

def restart_without_downtime_ready?
true
end

def start
super

Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_udp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ def multi_workers_ready?
true
end

def restart_without_downtime_ready?
true
end

def start
super

Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/input.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ def metric_callback(es)
def multi_workers_ready?
false
end

def restart_without_downtime_ready?
false
end
end
end
end
78 changes: 78 additions & 0 deletions lib/fluent/plugin/out_buffer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin/output'

module Fluent::Plugin
class BufferOutput < Output
Fluent::Plugin.register_output("buffer", self)
helpers :event_emitter

desc "Try to remove the buffer directory after terminate." +
" Mainly for the --with-source-only feature."
config_param :cleanup_after_shutdown, :bool, default: false

config_section :buffer do
config_set_default :@type, "file"
config_set_default :chunk_keys, ["tag"]
config_set_default :flush_mode, :interval
config_set_default :flush_interval, 10
end

def multi_workers_ready?
true
end

def initialize
super

@buffer_path = nil
end

def configure(conf)
super

raise Fluent::ConfigError, "The buffer plugin does not be supported. It must have 'buffer_path' getter. You can use 'file' buffer." unless @buffer.respond_to?(:buffer_path)

@buffer_path = @buffer.buffer_path
end

def write(chunk)
return if chunk.empty?
router.emit_stream(chunk.metadata.tag, Fluent::MessagePackEventStream.new(chunk.read))
end

def terminate
super

return unless cleanup_after_shutdown

unless Dir.empty?(@buffer_path)
if @buffer_config.flush_at_shutdown
log.warn "some buffer files remain in #{@buffer_path}, so cancel cleanup the directory." +
" Please consider saving or recovering the buffer files in the directory."
end
return
end

begin
FileUtils.remove_dir(@buffer_path)
rescue => e
log.warn "failed to remove the empty buffer dirctory: #{@buffer_path}", error: e
end
end
end
end
1 change: 1 addition & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1384,6 +1384,7 @@ def retry_state(randomize)
end

def submit_flush_once
return unless @buffer_config.flush_thread_count > 0
# Without locks: it is rough but enough to select "next" writer selection
@output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count
state = @output_flush_threads[@output_flush_thread_current_position]
Expand Down
12 changes: 12 additions & 0 deletions lib/fluent/plugin_helper/event_emitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ module EventEmitter

def router
@_event_emitter_used_actually = true

return Engine.root_agent.source_only_router if @_event_emitter_force_source_only_router

if @_event_emitter_lazy_init
@router = @primary_instance.router
end
Expand All @@ -48,6 +51,14 @@ def event_emitter_used_actually?
@_event_emitter_used_actually
end

def event_emitter_set_source_only
@_event_emitter_force_source_only_router = true
end

def event_emitter_cancel_source_only
@_event_emitter_force_source_only_router = false
end

def event_emitter_router(label_name)
if label_name
if label_name == "@ROOT"
Expand All @@ -72,6 +83,7 @@ def initialize
super
@_event_emitter_used_actually = false
@_event_emitter_lazy_init = false
@_event_emitter_force_source_only_router = false
@router = nil
end

Expand Down
Loading
Loading