Skip to content

Commit

Permalink
Merge pull request #92 from OhJuhun/implement-delegation-token-max-li…
Browse files Browse the repository at this point in the history
…fetime-option

Implement delegation token max lifetime option
  • Loading branch information
ashie authored Mar 19, 2024
2 parents c9a68b5 + 7c056fe commit ac69465
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 8 deletions.
2 changes: 1 addition & 1 deletion fluent-plugin-webhdfs.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ Gem::Specification.new do |gem|
gem.add_development_dependency "bzip2-ffi"
gem.add_development_dependency "zstandard"
gem.add_runtime_dependency "fluentd", '>= 0.14.22'
gem.add_runtime_dependency "webhdfs", '>= 0.10.0'
gem.add_runtime_dependency "webhdfs", '>= 0.11.0'
end
10 changes: 7 additions & 3 deletions lib/fluent/plugin/out_webhdfs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class Fluent::Plugin::WebHDFSOutput < Fluent::Plugin::Output
config_param :renew_kerberos_delegation_token, :bool, default: false
desc 'delegation token reuse timer (default 8h)'
config_param :renew_kerberos_delegation_token_interval, :time, default: 8 * 60 * 60
desc 'delegation token max-lifetime (default 7d)'
config_param :kerberos_delegation_token_max_lifetime, :time, default: 7 * 24 * 60 * 60

SUPPORTED_COMPRESS = [:gzip, :bzip2, :snappy, :hadoop_snappy, :lzo_command, :zstd, :text]
desc "Compression method (#{SUPPORTED_COMPRESS.join(',')})"
Expand Down Expand Up @@ -114,7 +116,7 @@ def configure(conf)
else 86400
end
if buffer_config = conf.elements(name: "buffer").first
timekey = buffer_config["timekey"] || timekey
timekey = buffer_config["timekey"] || timekey
end

compat_parameters_convert(conf, :buffer, default_chunk_key: "time")
Expand Down Expand Up @@ -189,7 +191,9 @@ def configure(conf)
end

@renew_kerberos_delegation_token_interval_hour = nil
@kerberos_delegation_token_max_lifetime_hour = nil
if @renew_kerberos_delegation_token
@kerberos_delegation_token_max_lifetime_hour = @kerberos_delegation_token_max_lifetime / 60 / 60
unless @username
raise Fluent::ConfigError, "username is missing. If you want to reuse delegation token, follow with kerberos accounts"
end
Expand All @@ -215,7 +219,7 @@ def multi_workers_ready?
end

def prepare_client(host, port, username)
client = WebHDFS::Client.new(host, port, username, nil, nil, nil, {}, @renew_kerberos_delegation_token_interval_hour)
client = WebHDFS::Client.new(host, port, username, nil, nil, nil, {}, @renew_kerberos_delegation_token_interval_hour, @kerberos_delegation_token_max_lifetime_hour)
if @httpfs
client.httpfs_mode = true
end
Expand Down Expand Up @@ -542,4 +546,4 @@ def self.register_compressor(name, compressor)
require 'fluent/plugin/webhdfs_compressor_snappy'
require 'fluent/plugin/webhdfs_compressor_hadoop_snappy'
require 'fluent/plugin/webhdfs_compressor_lzo_command'
require 'fluent/plugin/webhdfs_compressor_zstd'
require 'fluent/plugin/webhdfs_compressor_zstd'
20 changes: 16 additions & 4 deletions test/plugin/test_out_webhdfs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def test_time_key_without_buffer_section
})

test "renew_kerberos_delegation_token default" do
mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, nil).once
mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, nil, nil).once

d = create_driver(CONFIG_KERBEROS)

Expand All @@ -337,18 +337,20 @@ def test_time_key_without_buffer_section
kerberos: true,
renew_kerberos_delegation_token: false,
renew_kerberos_delegation_token_interval_hour: nil,
kerberos_delegation_token_max_lifetime_hour: nil,
},
{
kerberos: d.instance.kerberos,
renew_kerberos_delegation_token: d.instance.instance_eval("@renew_kerberos_delegation_token"),
renew_kerberos_delegation_token_interval_hour: d.instance.instance_eval("@renew_kerberos_delegation_token_interval_hour"),
kerberos_delegation_token_max_lifetime_hour: d.instance.instance_eval("@kerberos_delegation_token_max_lifetime_hour"),
})
end

test "default renew_kerberos_delegation_token_interval" do
expected_hour = 8

mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, expected_hour).once
expected_delegation_token_max_lifetime_hour = 7 * 24
mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, expected_hour, expected_delegation_token_max_lifetime_hour).once

d = create_driver(CONFIG_KERBEROS +
config_element("", "", { "renew_kerberos_delegation_token" => true }))
Expand All @@ -359,19 +361,24 @@ def test_time_key_without_buffer_section
renew_kerberos_delegation_token: true,
renew_kerberos_delegation_token_interval: expected_hour * 60 * 60,
renew_kerberos_delegation_token_interval_hour: expected_hour,
kerberos_delegation_token_max_lifetime: expected_delegation_token_max_lifetime_hour * 60 * 60,
kerberos_delegation_token_max_lifetime_hour: expected_delegation_token_max_lifetime_hour,
},
{
kerberos: d.instance.kerberos,
renew_kerberos_delegation_token: d.instance.instance_eval("@renew_kerberos_delegation_token"),
renew_kerberos_delegation_token_interval: d.instance.instance_eval("@renew_kerberos_delegation_token_interval"),
renew_kerberos_delegation_token_interval_hour: d.instance.instance_eval("@renew_kerberos_delegation_token_interval_hour"),
kerberos_delegation_token_max_lifetime: d.instance.instance_eval("@kerberos_delegation_token_max_lifetime"),
kerberos_delegation_token_max_lifetime_hour: d.instance.instance_eval("@kerberos_delegation_token_max_lifetime_hour"),
})
end

test "renew_kerberos_delegation_token_interval" do
expected_hour = 10
expected_delegation_token_max_lifetime_hour = 24

mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, expected_hour).once
mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, expected_hour,expected_delegation_token_max_lifetime_hour).once

d = create_driver(
CONFIG_KERBEROS +
Expand All @@ -380,6 +387,7 @@ def test_time_key_without_buffer_section
{
"renew_kerberos_delegation_token" => true,
"renew_kerberos_delegation_token_interval" => "#{expected_hour}h",
"kerberos_delegation_token_max_lifetime" => "#{expected_delegation_token_max_lifetime_hour}h"
}))

assert_equal(
Expand All @@ -388,12 +396,16 @@ def test_time_key_without_buffer_section
renew_kerberos_delegation_token: true,
renew_kerberos_delegation_token_interval: expected_hour * 60 * 60,
renew_kerberos_delegation_token_interval_hour: expected_hour,
kerberos_delegation_token_max_lifetime: expected_delegation_token_max_lifetime_hour * 60 * 60,
kerberos_delegation_token_max_lifetime_hour: expected_delegation_token_max_lifetime_hour
},
{
kerberos: d.instance.kerberos,
renew_kerberos_delegation_token: d.instance.instance_eval("@renew_kerberos_delegation_token"),
renew_kerberos_delegation_token_interval: d.instance.instance_eval("@renew_kerberos_delegation_token_interval"),
renew_kerberos_delegation_token_interval_hour: d.instance.instance_eval("@renew_kerberos_delegation_token_interval_hour"),
kerberos_delegation_token_max_lifetime: d.instance.instance_eval("@kerberos_delegation_token_max_lifetime"),
kerberos_delegation_token_max_lifetime_hour: d.instance.instance_eval("@kerberos_delegation_token_max_lifetime_hour"),
})
end

Expand Down

0 comments on commit ac69465

Please sign in to comment.