From 2b439cb8476b3dcc2794b8fb78fa42f7acb122bd Mon Sep 17 00:00:00 2001 From: Aaron Mildenstein Date: Mon, 7 Dec 2015 13:57:52 -0700 Subject: [PATCH] Set application timezone for zoneless datestamps Fortunately, the Sequel gem already had the necessary functionality. However, it turns out that Sequel _can_ do named timezones, but when you do the date comes back as a DateTime object instead. As a result, this should only use named_timezone support if defined, which should save cycles for those who already have their date fields in UTC, rather than punishing every user. Use string date to test timezone conversion to guarantee similarity to the issue case Fixes #95 --- CHANGELOG.md | 3 +- lib/logstash/plugin_mixins/jdbc.rb | 34 ++++++++++--- logstash-input-jdbc.gemspec | 1 + spec/inputs/jdbc_spec.rb | 78 +++++++++++++++++++++++++++++- 4 files changed, 107 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 064b410..3543143 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## 2.1.0 - [#85](https://github.com/logstash-plugins/logstash-input-jdbc/issues/85) make the jdbc_driver_library accept a list of elements separated by commas as in some situations we might need to load more than one jar/lib. + - [#89](https://github.com/logstash-plugins/logstash-input-jdbc/issues/89) Set application timezone for cases where time fields in data have no timezone. ## 2.0.5 - [#77](https://github.com/logstash-plugins/logstash-input-jdbc/issues/77) Time represented as RubyTime and not as Logstash::Timestamp @@ -13,7 +14,7 @@ - Added catch-all configuration option for any other options that Sequel lib supports ## 2.0.0 - - Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully, + - Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully, instead of using Thread.raise on the plugins' threads. Ref: https://github.com/elastic/logstash/pull/3895 - Dependency on logstash-core update to 2.0 diff --git a/lib/logstash/plugin_mixins/jdbc.rb b/lib/logstash/plugin_mixins/jdbc.rb index 897a1ef..e244115 100644 --- a/lib/logstash/plugin_mixins/jdbc.rb +++ b/lib/logstash/plugin_mixins/jdbc.rb @@ -1,8 +1,10 @@ # encoding: utf-8 # TAKEN FROM WIIBAA require "logstash/config/mixin" +require "time" +require "date" -# Tentative of abstracting JDBC logic to a mixin +# Tentative of abstracting JDBC logic to a mixin # for potential reuse in other plugins (input/output) module LogStash::PluginMixins::Jdbc @@ -45,7 +47,7 @@ def setup_jdbc_config # result-set. The limit size is set with `jdbc_page_size`. # # Be aware that ordering is not guaranteed between queries. - config :jdbc_paging_enabled, :validate => :boolean, :default => false + config :jdbc_paging_enabled, :validate => :boolean, :default => false # JDBC page size config :jdbc_page_size, :validate => :number, :default => 100000 @@ -65,6 +67,15 @@ def setup_jdbc_config # The amount of seconds to wait to acquire a connection before raising a PoolTimeoutError (default 5) config :jdbc_pool_timeout, :validate => :number, :default => 5 + # Timezone conversion. + # SQL does not allow for timezone data in timestamp fields. This plugin will automatically + # convert your SQL timestamp fields to Logstash timestamps, in relative UTC time in ISO8601 format. + # + # Using this setting will manually assign a specified timezone offset, instead + # of using the timezone setting of the local machine. You must use a canonical + # timezone, *America/Denver*, for example. + config :jdbc_default_timezone, :validate => :string + # General/Vendor-specific Sequel configuration options. # # An example of an optional connection pool configuration @@ -114,12 +125,12 @@ def prepare_jdbc_connection require "sequel" require "sequel/adapters/jdbc" load_drivers(@jdbc_driver_library.split(",")) if @jdbc_driver_library - + begin Sequel::JDBC.load_driver(@jdbc_driver_class) rescue Sequel::AdapterNotFound => e message = if @jdbc_driver_library.nil? - ":jdbc_driver_library is not set, are you sure you included + ":jdbc_driver_library is not set, are you sure you included the proper driver client libraries in your classpath?" else "Are you sure you've included the correct jdbc driver in :jdbc_driver_library?" @@ -128,6 +139,10 @@ def prepare_jdbc_connection end @database = jdbc_connect() @database.extension(:pagination) + if @jdbc_default_timezone + @database.extension(:named_timezones) + @database.timezone = @jdbc_default_timezone + end if @jdbc_validate_connection @database.extension(:connection_validator) @database.pool.connection_validation_timeout = @jdbc_validation_timeout @@ -152,7 +167,7 @@ def close_jdbc_connection public def execute_statement(statement, parameters) success = false - begin + begin parameters = symbolized_params(parameters) query = @database[statement, parameters] @logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => query.count) @@ -177,9 +192,9 @@ def execute_statement(statement, parameters) end # Symbolize parameters keys to use with Sequel - private + private def symbolized_params(parameters) - parameters.inject({}) do |hash,(k,v)| + parameters.inject({}) do |hash,(k,v)| case v when LogStash::Timestamp hash[k.to_sym] = v.time @@ -198,9 +213,14 @@ def extract_values_from(row) private def decorate_value(value) + if value.is_a?(Time) # transform it to LogStash::Timestamp as required by LS LogStash::Timestamp.new(value) + elsif value.is_a?(DateTime) + # Manual timezone conversion detected. + # This is slower, so we put it in as a conditional case. + LogStash::Timestamp.new(Time.parse(value.to_s)) else value # no-op end diff --git a/logstash-input-jdbc.gemspec b/logstash-input-jdbc.gemspec index 4de22a5..4536872 100755 --- a/logstash-input-jdbc.gemspec +++ b/logstash-input-jdbc.gemspec @@ -21,6 +21,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency "logstash-core", ">= 2.0.0.beta2", "< 3.0.0" s.add_runtime_dependency 'logstash-codec-plain' s.add_runtime_dependency 'sequel' + s.add_runtime_dependency 'tzinfo' s.add_runtime_dependency 'rufus-scheduler' s.add_development_dependency 'logstash-devutils' diff --git a/spec/inputs/jdbc_spec.rb b/spec/inputs/jdbc_spec.rb index 31cdbf3..be1c8ed 100755 --- a/spec/inputs/jdbc_spec.rb +++ b/spec/inputs/jdbc_spec.rb @@ -5,10 +5,12 @@ require "sequel/adapters/jdbc" require "timecop" require "stud/temporary" +require "time" +require "date" describe LogStash::Inputs::Jdbc do let(:mixin_settings) do - { "jdbc_user" => ENV['USER'], "jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver", + { "jdbc_user" => ENV['USER'], "jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver", "jdbc_connection_string" => "jdbc:derby:memory:testdb;create=true"} end let(:settings) { {} } @@ -216,6 +218,80 @@ end end + context "when fetching time data with jdbc_default_timezone set" do + let(:mixin_settings) do + { "jdbc_user" => ENV['USER'], "jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver", + "jdbc_connection_string" => "jdbc:derby:memory:testdb;create=true", + "jdbc_default_timezone" => "America/Chicago" + } + end + + let(:settings) do + { + "statement" => "SELECT * from test_table", + } + end + + let(:num_rows) { 10 } + + before do + stub_const('ENV', ENV.to_hash.merge('TZ' => 'UTC')) + num_rows.times do + db[:test_table].insert(:num => 1, :custom_time => "2015-01-01 12:00:00", :created_at => Time.now.utc) + end + + plugin.register + end + + after do + plugin.stop + end + + it "should convert the time to reflect the timezone " do + plugin.run(queue) + event = queue.pop + # This reflects a 6 hour time difference between UTC and America/Chicago + expect(event["custom_time"].time).to eq(Time.iso8601("2015-01-01T18:00:00Z")) + end + end + + context "when fetching time data without jdbc_default_timezone set" do + + let(:mixin_settings) do + { "jdbc_user" => ENV['USER'], "jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver", + "jdbc_connection_string" => "jdbc:derby:memory:testdb;create=true" + } + end + + let(:settings) do + { + "statement" => "SELECT * from test_table", + } + end + + let(:num_rows) { 1 } + + before do + stub_const('ENV', ENV.to_hash.merge('TZ' => 'UTC')) + num_rows.times do + db.run "INSERT INTO test_table (created_at, num, custom_time) VALUES (TIMESTAMP('2015-01-01 12:00:00'), 1, TIMESTAMP('2015-01-01 12:00:00'))" + end + + plugin.register + end + + after do + plugin.stop + end + + it "should not convert the time to reflect the timezone " do + plugin.run(queue) + event = queue.pop + # With no timezone set, no change should occur + expect(event["custom_time"].time).to eq(Time.iso8601("2015-01-01T12:00:00Z")) + end + end + context "when iteratively running plugin#run" do let(:settings) do {"statement" => "SELECT num, created_at FROM test_table WHERE created_at > :sql_last_start"}