Skip to content

Commit

Permalink
Merge pull request #101 from valinurovam/fix-heartbeat-issue
Browse files Browse the repository at this point in the history
Issue-100 Fix heartbeat issue
  • Loading branch information
valinurovam authored Nov 1, 2023
2 parents f28eda8 + a5d9be1 commit ad0c48e
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 6 deletions.
4 changes: 4 additions & 0 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func (consumer *Consumer) retrieveAndSendMessage() {
return
}

if consumer.noAck {
consumer.queue.AckMsg(message)
}

dTag := consumer.channel.NextDeliveryTag()
if !consumer.noAck {
consumer.channel.AddUnackedMessage(dTag, consumer.ConsumerTag, consumer.queue.GetName(), message)
Expand Down
35 changes: 29 additions & 6 deletions server/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewConnection(server *Server, netConn *net.TCPConn) (connection *Connection
srvMetrics: server.metrics,
wg: &sync.WaitGroup{},
lastOutgoingTS: make(chan time.Time),
heartbeatInterval: 10,
heartbeatInterval: 60, // default value as RabbitMQ default value
}

connection.logger = log.WithFields(log.Fields{
Expand Down Expand Up @@ -270,9 +270,14 @@ func (conn *Connection) handleOutgoing() {
return
case frame := <-conn.outgoing:
if frame == nil {
conn.logger.Warn("unexpected nil frame")
return
}

if frame.Type == amqp.FrameHeartbeat {
conn.logger.Debug("Outgoing -> Heartbeat")
}

if err = amqp.WriteFrame(buffer, frame); err != nil && !conn.isClosedError(err) {
conn.logger.WithError(err).Warn("writing frame")
return
Expand Down Expand Up @@ -339,7 +344,7 @@ func (conn *Connection) handleIncoming() {
for {
// TODO
// @spec-note
// After sending connection.close , any received methods except Close and Close­OK MUST be discarded.
// After sending connection.close, any received methods except Close and Close­OK MUST be discarded.
// The response to receiving a Close after sending Close must be to send Close­Ok.
frame, err := amqp.ReadFrame(buffer)
if err != nil {
Expand Down Expand Up @@ -381,6 +386,10 @@ func (conn *Connection) handleIncoming() {
return
}

if frame.Type == amqp.FrameHeartbeat {
conn.logger.Debug("Incoming <- Heartbeat")
}

select {
case <-conn.ctx.Done():
close(channel.incoming)
Expand All @@ -391,7 +400,9 @@ func (conn *Connection) handleIncoming() {
}

func (conn *Connection) heartBeater() {
interval := time.Duration(conn.heartbeatInterval) * time.Second
defer conn.wg.Done()

interval := time.Duration(conn.heartbeatInterval) * time.Second / 2
conn.heartbeatTimer = time.NewTicker(interval)

var (
Expand All @@ -401,7 +412,9 @@ func (conn *Connection) heartBeater() {

heartbeatFrame := &amqp.Frame{Type: byte(amqp.FrameHeartbeat), ChannelID: 0, Payload: []byte{}, CloseAfter: false, Sync: true}

conn.wg.Add(1)
go func() {
defer conn.wg.Done()
for {
lastTs, ok = <-conn.lastOutgoingTS
if !ok {
Expand All @@ -410,9 +423,19 @@ func (conn *Connection) heartBeater() {
}
}()

for tickTime := range conn.heartbeatTimer.C {
if tickTime.Sub(lastTs) >= interval-time.Second {
conn.outgoing <- heartbeatFrame
for {
select {
case ts, stillOutgoing := <-conn.lastOutgoingTS:
if !stillOutgoing {
return
}
lastTs = ts
case tickTime := <-conn.heartbeatTimer.C:
if tickTime.Sub(lastTs) >= interval-time.Second {
conn.outgoing <- heartbeatFrame
}
case <-conn.ctx.Done():
return
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions server/connectionMethods.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ func (channel *Channel) connectionTuneOk(method *amqp.ConnectionTuneOk) *amqp.Er
channel.conn.heartbeatInterval = method.Heartbeat
}
channel.conn.heartbeatTimeout = channel.conn.heartbeatInterval * 3

channel.conn.wg.Add(1)
go channel.conn.heartBeater()
}

Expand Down

0 comments on commit ad0c48e

Please sign in to comment.