Skip to content

Commit

Permalink
Issue #63, fast spurious fail overs.
Browse files Browse the repository at this point in the history
Added connection parameter to allow clients to supply
an adjustment value to the sender heartbeat sleep time.
See notes/heartbeat_readme.txt for guidance on use.
  • Loading branch information
gmallard committed Aug 9, 2013
1 parent ae79f22 commit bb5cf49
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 20 deletions.
3 changes: 3 additions & 0 deletions README.rdoc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ See _CHANGELOG.rdoc_ for details.
:usecrlf => false, # Use CRLF command and header line ends (1.2+)
:max_hbread_fails => 0, # Max HB read fails before retry. 0 => never retry
:max_hbrlck_fails => 0, # Max HB read lock obtain fails before retry. 0 => never retry
:fast_hbs_adjust => 0.0, # Fast heartbeat senders sleep adjustment, seconds, needed ...
# For fast heartbeat senders. 'fast' == YMMV. If not
# correct for your environment, expect unnecessary fail overs
}

# for client
Expand Down
6 changes: 3 additions & 3 deletions examples/slogger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def on_abort(parms, headers)
end

# Stomp 1.1+ - heart beat read (receive) failed.
def on_hbread_fail(parms, ticker_data)
def on_hbread_fail(parms, ticker_data = {})
begin
@log.debug "Hbreadf Parms #{info(parms)}"
@log.debug "Hbreadf Result #{ticker_data.inspect}"
Expand All @@ -213,7 +213,7 @@ def on_hbread_fail(parms, ticker_data)
end

# Stomp 1.1+ - heart beat send (transmit) failed.
def on_hbwrite_fail(parms, ticker_data)
def on_hbwrite_fail(parms, ticker_data = {})
begin
@log.debug "Hbwritef Parms #{info(parms)}"
@log.debug "Hbwritef Result #{ticker_data.inspect}"
Expand Down Expand Up @@ -256,7 +256,7 @@ def on_ssl_connectfail(parms)
end

# Log heart beat fires
def on_hbfire(parms, srind, curt)
def on_hbfire(parms, srind, firedata = {})
begin
@log.debug "HeartBeat Fire Parms #{info(parms)}"
@log.debug "HeartBeat Fire Send/Receive #{srind}"
Expand Down
27 changes: 17 additions & 10 deletions lib/connection/heartbeats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,28 @@ def _init_heartbeats()
def _start_send_ticker()
sleeptime = @hbsend_interval / 1000000.0 # Sleep time secs
reconn = false
adjust = 0.0
@st = Thread.new {
first_time = true
while true do
sleep sleeptime
#
slt = sleeptime - adjust - @fast_hbs_adjust
sleep(slt)
next unless @socket # nil under some circumstances ??
curt = Time.now.to_f
if @logger && @logger.respond_to?(:on_hbfire)
@logger.on_hbfire(log_params, "send_fire", curt)
@logger.on_hbfire(log_params, "send_fire", :curt => curt, :last_sleep => slt)
end
delta = curt - @ls
if delta > sleeptime
# Be tolerant (minus), and always do this the first time through.
# Reintroduce logic removed in d922fa.
compval = (@hbsend_interval - (@hbsend_interval/5.0)) / 1000000.0
if delta > compval || first_time
first_time = false
if @logger && @logger.respond_to?(:on_hbfire)
@logger.on_hbfire(log_params, "send_heartbeat", curt)
@logger.on_hbfire(log_params, "send_heartbeat", :last_sleep => slt,
:curt => curt, :last_send => @ls, :delta => delta,
:compval => compval)
end
# Send a heartbeat
@transmit_semaphore.synchronize do
Expand Down Expand Up @@ -135,6 +145,7 @@ def _start_send_ticker()
Thread.exit # This sender thread is done
end
end
adjust = Time.now.to_f - curt
Thread.pass
end
}
Expand All @@ -155,17 +166,15 @@ def _start_receive_ticker()
rdrdy = _is_ready?(@socket)
curt = Time.now.to_f
if @logger && @logger.respond_to?(:on_hbfire)
@logger.on_hbfire(log_params, "receive_fire", curt)
@logger.on_hbfire(log_params, "receive_fire", :curt => curt)
end

#
begin
delta = curt - @lr
if delta > sleeptime
if @logger && @logger.respond_to?(:on_hbfire)
@logger.on_hbfire(log_params, "receive_heartbeat", curt)
@logger.on_hbfire(log_params, "receive_heartbeat", {})
end

# Client code could be off doing something else (that is, no reading of
# the socket has been requested by the caller). Try to handle that case.
lock = @read_semaphore.try_lock
Expand Down Expand Up @@ -223,7 +232,6 @@ def _start_receive_ticker()
end
fail_hard = true
end

# Do we want to attempt a retry?
if @reliable
# Retry on hard fail or max read fails
Expand All @@ -245,7 +253,6 @@ def _start_receive_ticker()
Thread.exit # This receiver thread is done
end
end

Thread.pass # Prior to next receive loop
#
end # of the "while true"
Expand Down
1 change: 1 addition & 0 deletions lib/connection/utils.rb
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def refine_params(params)
:stompconn => false,
:max_hbread_fails => 0,
:max_hbrlck_fails => 0,
:fast_hbs_adjust => 0.0,
}

res_params = default_params.merge(params)
Expand Down
1 change: 1 addition & 0 deletions lib/stomp/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class Client
# :usecrlf => false,
# :max_hbread_fails => 0,
# :max_hbrlck_fails => 0,
# :fast_hbs_adjust => 0.0,
# }
#
# e.g. c = Stomp::Client.new(hash)
Expand Down
3 changes: 3 additions & 0 deletions lib/stomp/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def self.default_port(ssl)
# :usecrlf => false,
# :max_hbread_fails => 0,
# :max_hbrlck_fails => 0,
# :fast_hbs_adjust => 0.0,
# }
#
# e.g. c = Stomp::Connection.new(hash)
Expand Down Expand Up @@ -116,6 +117,7 @@ def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reli
@usecrlf = false # If true, use \r\n as line ends (1.2 only)
@max_hbread_fails = 0 # 0 means never retry for HB read failures
@max_hbrlck_fails = 0 # 0 means never retry for HB read lock failures
@fast_hbs_adjust = 0.0 # Fast heartbeat senders sleep adjustment
warn "login looks like a URL, do you have the correct parameters?" if @login =~ /:\/\//
end

Expand Down Expand Up @@ -150,6 +152,7 @@ def hashed_initialize(params)
@usecrlf = @parameters[:usecrlf]
@max_hbread_fails = @parameters[:max_hbread_fails]
@max_hbrlck_fails = @parameters[:max_hbrlck_fails]
@fast_hbs_adjust = @parameters[:fast_hbs_adjust]
#sets the first host to connect
change_host
end
Expand Down
70 changes: 68 additions & 2 deletions notes/heartbeat_readme.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ error).

-----------------------------------------------------------

General advice:

Set your heartbeat intervals to the maximum possible to obtain your desired
behavior. Do *not* set them at extremely low values even if the broker allows
that. An absurd example:

heart-beat:1,1

which will likely not work well.

-----------------------------------------------------------

General notes:

In your real world apps, think about whether one or both of these parameters
Expand All @@ -98,6 +110,60 @@ We have done a variety of informal tests here, using both server kill and
packet drop strategies as appropriate. We believe more real world testing is
required.

We already know that the use of IO#ready? will diminish (probably break) JRuby
functionality.
-----------------------------------------------------------

08/07/2013

Issue #63 related, specifically fast send heart beats are being used and
spurious fail overs occur in rapid succession.

Background:

Fail over from heartbeat failures was introduced in gem version 1.2.10.

Subsequently:

This issue has been observed and documented in the following environment:

-- JRuby engine 1.7.4 *and*
-- ActiveMQ 5.8.0 *and*
-- 'fast' client send heartbeats

Heartbeat sends were at 2000ms.

At this point in time, fast send heart beats and spurious fail overs have
*not* been observed using:

-- Any native RUBY_ENGINE and ActiveMQ
-- Any native RUBY_ENGINE and Apollo (client send rates are limited by default)
-- Any native RUBY_ENGINE and RabbitMQ
-- JRuby and Apollo (client send rates are limited by default)
-- JRuby and RabbitMQ

Note that 'fast' will depend on your use case for heartbeats. Observations
are that sending heartbeat times less than 5000ms might be considered 'fast'
in the targeted environment.

The solution / bypass being put in place as of the above date was developed
through experimentation and is as follows:

- Add 'adjustment' logic to the heartbeat sender (thanks to ppaul for this idea).
- Re-introduce tolerance logic removed in d922fa.
- Add a new connection hash parameter to adjust heartbeat sends.

The newly introduced connection hash parameter is:

:fast_hbs_adjust => 0.0 # The default, no adjustment to sender sleep times (sec)

Recommendation for gem users that:

- Use fast send heartbeats
- Actually notice spurious fail overs

is to provide a very sender sleep time adjustment when connecting. Examples:

:fast_hbs_adjust => 0.05 # 50 milliseconds
:fast_hbs_adjust => 0.10 # 100 milliseconds

As usual, YMMV.

16 changes: 11 additions & 5 deletions spec/connection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
:usecrlf => false,
:max_hbread_fails => 0,
:max_hbrlck_fails => 0,
:fast_hbs_adjust => 0.0,
}

#POG:
Expand Down Expand Up @@ -89,11 +90,12 @@ def _receive( s )
"backOffMultiplier" => 2,
"maxReconnectAttempts" => 0,
"randomize" => false,
"connect_timeout" => 0,
"parse_timeout" => 5,
"connectTimeout" => 0,
"parseTimeout" => 5,
"usecrlf" => false,
:max_hbread_fails => 0,
:max_hbrlck_fails => 0,
:maxHbreadFails => 0,
:maxHbrlckFails => 0,
:fastHbsAdjust => 0.0,
}

@connection = Stomp::Connection.new(used_hash)
Expand Down Expand Up @@ -272,7 +274,9 @@ class SSLContext

before(:each) do
ssl_parameters = {:hosts => [{:login => "login2", :passcode => "passcode2", :host => "remotehost", :ssl => true}]}
@ssl_socket = mock(:ssl_socket, :puts => nil, :write => nil, :setsockopt => nil, :flush => true)
@ssl_socket = mock(:ssl_socket, :puts => nil, :write => nil,
:setsockopt => nil, :flush => true)
@ssl_socket.stub!(:sync_close=)

TCPSocket.should_receive(:open).and_return @tcp_socket
OpenSSL::SSL::SSLSocket.should_receive(:new).and_return(@ssl_socket)
Expand Down Expand Up @@ -344,6 +348,7 @@ class SSLContext
:stompconn => false,
:max_hbread_fails => 0,
:max_hbrlck_fails => 0,
:fast_hbs_adjust => 0.0,
}

used_hash = {
Expand Down Expand Up @@ -382,6 +387,7 @@ class SSLContext
:usecrlf => true,
:max_hbread_fails => 123,
:max_hbrlck_fails => 456,
:fast_hbs_adjust => 0.2,
}

@connection = Stomp::Connection.new(used_hash)
Expand Down

0 comments on commit bb5cf49

Please sign in to comment.