diff --git a/README.rdoc b/README.rdoc index 1f8fd3a..cb16f9c 100644 --- a/README.rdoc +++ b/README.rdoc @@ -54,6 +54,9 @@ See _CHANGELOG.rdoc_ for details. :usecrlf => false, # Use CRLF command and header line ends (1.2+) :max_hbread_fails => 0, # Max HB read fails before retry. 0 => never retry :max_hbrlck_fails => 0, # Max HB read lock obtain fails before retry. 0 => never retry + :fast_hbs_adjust => 0.0, # Fast heartbeat senders sleep adjustment, seconds, needed ... + # For fast heartbeat senders. 'fast' == YMMV. If not + # correct for your environment, expect unnecessary fail overs } # for client diff --git a/examples/slogger.rb b/examples/slogger.rb index c8ea235..278a1e5 100644 --- a/examples/slogger.rb +++ b/examples/slogger.rb @@ -203,7 +203,7 @@ def on_abort(parms, headers) end # Stomp 1.1+ - heart beat read (receive) failed. - def on_hbread_fail(parms, ticker_data) + def on_hbread_fail(parms, ticker_data = {}) begin @log.debug "Hbreadf Parms #{info(parms)}" @log.debug "Hbreadf Result #{ticker_data.inspect}" @@ -213,7 +213,7 @@ def on_hbread_fail(parms, ticker_data) end # Stomp 1.1+ - heart beat send (transmit) failed. - def on_hbwrite_fail(parms, ticker_data) + def on_hbwrite_fail(parms, ticker_data = {}) begin @log.debug "Hbwritef Parms #{info(parms)}" @log.debug "Hbwritef Result #{ticker_data.inspect}" @@ -256,7 +256,7 @@ def on_ssl_connectfail(parms) end # Log heart beat fires - def on_hbfire(parms, srind, curt) + def on_hbfire(parms, srind, firedata = {}) begin @log.debug "HeartBeat Fire Parms #{info(parms)}" @log.debug "HeartBeat Fire Send/Receive #{srind}" diff --git a/lib/connection/heartbeats.rb b/lib/connection/heartbeats.rb index 803c78f..624cb98 100644 --- a/lib/connection/heartbeats.rb +++ b/lib/connection/heartbeats.rb @@ -91,18 +91,28 @@ def _init_heartbeats() def _start_send_ticker() sleeptime = @hbsend_interval / 1000000.0 # Sleep time secs reconn = false + adjust = 0.0 @st = Thread.new { + first_time = true while true do - sleep sleeptime + # + slt = sleeptime - adjust - @fast_hbs_adjust + sleep(slt) next unless @socket # nil under some circumstances ?? curt = Time.now.to_f if @logger && @logger.respond_to?(:on_hbfire) - @logger.on_hbfire(log_params, "send_fire", curt) + @logger.on_hbfire(log_params, "send_fire", :curt => curt, :last_sleep => slt) end delta = curt - @ls - if delta > sleeptime + # Be tolerant (minus), and always do this the first time through. + # Reintroduce logic removed in d922fa. + compval = (@hbsend_interval - (@hbsend_interval/5.0)) / 1000000.0 + if delta > compval || first_time + first_time = false if @logger && @logger.respond_to?(:on_hbfire) - @logger.on_hbfire(log_params, "send_heartbeat", curt) + @logger.on_hbfire(log_params, "send_heartbeat", :last_sleep => slt, + :curt => curt, :last_send => @ls, :delta => delta, + :compval => compval) end # Send a heartbeat @transmit_semaphore.synchronize do @@ -135,6 +145,7 @@ def _start_send_ticker() Thread.exit # This sender thread is done end end + adjust = Time.now.to_f - curt Thread.pass end } @@ -155,17 +166,15 @@ def _start_receive_ticker() rdrdy = _is_ready?(@socket) curt = Time.now.to_f if @logger && @logger.respond_to?(:on_hbfire) - @logger.on_hbfire(log_params, "receive_fire", curt) + @logger.on_hbfire(log_params, "receive_fire", :curt => curt) end - # begin delta = curt - @lr if delta > sleeptime if @logger && @logger.respond_to?(:on_hbfire) - @logger.on_hbfire(log_params, "receive_heartbeat", curt) + @logger.on_hbfire(log_params, "receive_heartbeat", {}) end - # Client code could be off doing something else (that is, no reading of # the socket has been requested by the caller). Try to handle that case. lock = @read_semaphore.try_lock @@ -223,7 +232,6 @@ def _start_receive_ticker() end fail_hard = true end - # Do we want to attempt a retry? if @reliable # Retry on hard fail or max read fails @@ -245,7 +253,6 @@ def _start_receive_ticker() Thread.exit # This receiver thread is done end end - Thread.pass # Prior to next receive loop # end # of the "while true" diff --git a/lib/connection/utils.rb b/lib/connection/utils.rb index 7f99ca1..548bf0b 100644 --- a/lib/connection/utils.rb +++ b/lib/connection/utils.rb @@ -181,6 +181,7 @@ def refine_params(params) :stompconn => false, :max_hbread_fails => 0, :max_hbrlck_fails => 0, + :fast_hbs_adjust => 0.0, } res_params = default_params.merge(params) diff --git a/lib/stomp/client.rb b/lib/stomp/client.rb index 7586e3d..28625c2 100644 --- a/lib/stomp/client.rb +++ b/lib/stomp/client.rb @@ -57,6 +57,7 @@ class Client # :usecrlf => false, # :max_hbread_fails => 0, # :max_hbrlck_fails => 0, + # :fast_hbs_adjust => 0.0, # } # # e.g. c = Stomp::Client.new(hash) diff --git a/lib/stomp/connection.rb b/lib/stomp/connection.rb index 7f5cac1..1fcbc6d 100644 --- a/lib/stomp/connection.rb +++ b/lib/stomp/connection.rb @@ -70,6 +70,7 @@ def self.default_port(ssl) # :usecrlf => false, # :max_hbread_fails => 0, # :max_hbrlck_fails => 0, + # :fast_hbs_adjust => 0.0, # } # # e.g. c = Stomp::Connection.new(hash) @@ -116,6 +117,7 @@ def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reli @usecrlf = false # If true, use \r\n as line ends (1.2 only) @max_hbread_fails = 0 # 0 means never retry for HB read failures @max_hbrlck_fails = 0 # 0 means never retry for HB read lock failures + @fast_hbs_adjust = 0.0 # Fast heartbeat senders sleep adjustment warn "login looks like a URL, do you have the correct parameters?" if @login =~ /:\/\// end @@ -150,6 +152,7 @@ def hashed_initialize(params) @usecrlf = @parameters[:usecrlf] @max_hbread_fails = @parameters[:max_hbread_fails] @max_hbrlck_fails = @parameters[:max_hbrlck_fails] + @fast_hbs_adjust = @parameters[:fast_hbs_adjust] #sets the first host to connect change_host end diff --git a/notes/heartbeat_readme.txt b/notes/heartbeat_readme.txt index d2c4ec8..268d79f 100644 --- a/notes/heartbeat_readme.txt +++ b/notes/heartbeat_readme.txt @@ -77,6 +77,18 @@ error). ----------------------------------------------------------- +General advice: + +Set your heartbeat intervals to the maximum possible to obtain your desired +behavior. Do *not* set them at extremely low values even if the broker allows +that. An absurd example: + +heart-beat:1,1 + +which will likely not work well. + +----------------------------------------------------------- + General notes: In your real world apps, think about whether one or both of these parameters @@ -98,6 +110,60 @@ We have done a variety of informal tests here, using both server kill and packet drop strategies as appropriate. We believe more real world testing is required. -We already know that the use of IO#ready? will diminish (probably break) JRuby -functionality. +----------------------------------------------------------- + +08/07/2013 + +Issue #63 related, specifically fast send heart beats are being used and +spurious fail overs occur in rapid succession. + +Background: + +Fail over from heartbeat failures was introduced in gem version 1.2.10. + +Subsequently: + +This issue has been observed and documented in the following environment: + +-- JRuby engine 1.7.4 *and* +-- ActiveMQ 5.8.0 *and* +-- 'fast' client send heartbeats + +Heartbeat sends were at 2000ms. + +At this point in time, fast send heart beats and spurious fail overs have +*not* been observed using: + +-- Any native RUBY_ENGINE and ActiveMQ +-- Any native RUBY_ENGINE and Apollo (client send rates are limited by default) +-- Any native RUBY_ENGINE and RabbitMQ +-- JRuby and Apollo (client send rates are limited by default) +-- JRuby and RabbitMQ + +Note that 'fast' will depend on your use case for heartbeats. Observations +are that sending heartbeat times less than 5000ms might be considered 'fast' +in the targeted environment. + +The solution / bypass being put in place as of the above date was developed +through experimentation and is as follows: + +- Add 'adjustment' logic to the heartbeat sender (thanks to ppaul for this idea). +- Re-introduce tolerance logic removed in d922fa. +- Add a new connection hash parameter to adjust heartbeat sends. + +The newly introduced connection hash parameter is: + +:fast_hbs_adjust => 0.0 # The default, no adjustment to sender sleep times (sec) + +Recommendation for gem users that: + +- Use fast send heartbeats +- Actually notice spurious fail overs + +is to provide a very sender sleep time adjustment when connecting. Examples: + +:fast_hbs_adjust => 0.05 # 50 milliseconds +:fast_hbs_adjust => 0.10 # 100 milliseconds + +As usual, YMMV. diff --git a/spec/connection_spec.rb b/spec/connection_spec.rb index 2b4fb12..b471333 100644 --- a/spec/connection_spec.rb +++ b/spec/connection_spec.rb @@ -27,6 +27,7 @@ :usecrlf => false, :max_hbread_fails => 0, :max_hbrlck_fails => 0, + :fast_hbs_adjust => 0.0, } #POG: @@ -89,11 +90,12 @@ def _receive( s ) "backOffMultiplier" => 2, "maxReconnectAttempts" => 0, "randomize" => false, - "connect_timeout" => 0, - "parse_timeout" => 5, + "connectTimeout" => 0, + "parseTimeout" => 5, "usecrlf" => false, - :max_hbread_fails => 0, - :max_hbrlck_fails => 0, + :maxHbreadFails => 0, + :maxHbrlckFails => 0, + :fastHbsAdjust => 0.0, } @connection = Stomp::Connection.new(used_hash) @@ -272,7 +274,9 @@ class SSLContext before(:each) do ssl_parameters = {:hosts => [{:login => "login2", :passcode => "passcode2", :host => "remotehost", :ssl => true}]} - @ssl_socket = mock(:ssl_socket, :puts => nil, :write => nil, :setsockopt => nil, :flush => true) + @ssl_socket = mock(:ssl_socket, :puts => nil, :write => nil, + :setsockopt => nil, :flush => true) + @ssl_socket.stub!(:sync_close=) TCPSocket.should_receive(:open).and_return @tcp_socket OpenSSL::SSL::SSLSocket.should_receive(:new).and_return(@ssl_socket) @@ -344,6 +348,7 @@ class SSLContext :stompconn => false, :max_hbread_fails => 0, :max_hbrlck_fails => 0, + :fast_hbs_adjust => 0.0, } used_hash = { @@ -382,6 +387,7 @@ class SSLContext :usecrlf => true, :max_hbread_fails => 123, :max_hbrlck_fails => 456, + :fast_hbs_adjust => 0.2, } @connection = Stomp::Connection.new(used_hash)