diff --git a/errors.go b/errors.go index 223a98f..4c04991 100644 --- a/errors.go +++ b/errors.go @@ -12,6 +12,8 @@ type EventProcessingError struct { var ErrNilContext = errors.New("received nil context") +var ErrExchangeClosed = errors.New("exchange has been closed") + func NewEventProcessingError(err error) error { return &EventProcessingError{err} } diff --git a/exchange.go b/exchange.go index 346642b..7a24337 100644 --- a/exchange.go +++ b/exchange.go @@ -15,6 +15,7 @@ import ( const ( maxRequeueNo = 3 + defaultMaxConnections = 20 requeueHeaderKey = "requeue-no" retryQueueNameSuffix = "retry" retryExchangeNameSuffix = "retry" @@ -33,158 +34,95 @@ type RabbitExchange interface { Close() error } -type RabbitExchangeImpl struct { - rabbitIni RabbitConfig - rabbitEmitConnection *connectionManager - rabbitConsumeConnection *connectionManager +type connectionChannel struct { + connection *amqp.Connection + channel *amqp.Channel } -type connectionManager struct { +type RabbitExchangeImpl struct { rabbitIni RabbitConfig - rabbitConnectionMutex sync.RWMutex rabbitConnectionConnectTimeout chan int - rabbitConnection *amqp.Connection + connectionPool chan *connectionChannel + maxConnections int + connMu sync.RWMutex } func NewRabbitExchange(rabbitIni RabbitConfig) *RabbitExchangeImpl { - rabbitEmitConnection := &connectionManager{ + re := &RabbitExchangeImpl{ rabbitIni: rabbitIni, rabbitConnectionConnectTimeout: make(chan int, 1), + maxConnections: defaultMaxConnections, + connectionPool: make(chan *connectionChannel, defaultMaxConnections), } - rabbitConsumeConnection := &connectionManager{ - rabbitIni: rabbitIni, - rabbitConnectionConnectTimeout: make(chan int, 1), - } - return &RabbitExchangeImpl{ - rabbitIni: rabbitIni, - rabbitEmitConnection: rabbitEmitConnection, - rabbitConsumeConnection: rabbitConsumeConnection, - } + return re } func (re *RabbitExchangeImpl) Close() error { - err := re.rabbitEmitConnection.closeConnection() - if err != nil { - return err - } - return re.rabbitConsumeConnection.closeConnection() -} - -func (re *RabbitExchangeImpl) SendTo(name, exchangeType string, durable, autoDelete bool, key string) MessageHandleFunc { - - var conn *amqp.Connection - var ch *amqp.Channel - var version int64 = 0 + re.connMu.Lock() + conns := re.connectionPool + re.connectionPool = nil + re.connMu.Unlock() - var connM sync.RWMutex - var chM sync.RWMutex - - getConnection := func() (*amqp.Connection, error) { - currentConn := func() *amqp.Connection { - connM.RLock() - defer connM.RUnlock() - if conn != nil && !conn.IsClosed() { - return conn - } - return nil - }() - - if currentConn != nil { - return currentConn, nil - } - - return func() (*amqp.Connection, error) { - connM.Lock() - defer connM.Unlock() - if conn == nil || conn.IsClosed() { - var err error - conn, err = re.rabbitEmitConnection.newEventConnection(conn, re.rabbitIni) - if err != nil { - return nil, errors.Wrapf(err, "rabbit:Failed to reconnect to rabbit %+v", err) - } - ch = nil - - } - return conn, nil - }() + if conns == nil { + return ErrExchangeClosed } - getChannel := func() (*amqp.Channel, int64, error) { - currentChannel, currentVersion := func() (*amqp.Channel, int64) { - chM.RLock() - defer chM.RUnlock() - return ch, version - }() - if currentChannel != nil { - return currentChannel, currentVersion, nil + close(conns) + for conn := range conns { + err := closeConnectionAndChannel(conn) + if err != nil { + return err } - - return func() (*amqp.Channel, int64, error) { - conn, err := getConnection() - if err != nil { - return nil, 0, err - } - - chM.Lock() - defer chM.Unlock() - - if ch == nil { - var err error - ch, err = conn.Channel() - if err != nil { - return nil, 0, err - } - version++ - err = ch.ExchangeDeclare( - name, // name - exchangeType, // type - durable, // durable - autoDelete, // delete when unused - false, // exclusive - false, // no-wait - nil, // args - ) - - if err != nil { - return nil, 0, err - } - } - return ch, version, nil - }() } + return nil +} - replaceChannel := func(oldVersion int64) { - chM.Lock() - defer chM.Unlock() - if oldVersion == version { - if ch != nil { - ch.Close() - ch = nil - } +func (re *RabbitExchangeImpl) SendTo(name, exchangeType string, durable, autoDelete bool, key string) MessageHandleFunc { + // Takes a connection+channel from the pool every time we call MessageHandleFunc + return func(ctx context.Context, message []byte) error { + var err, firstError error + var try int + conn, err := re.getEventConnection() + if err != nil { + return err } + defer re.releaseConn(conn) - } - - return func(ctx context.Context, message []byte) error { if ctx == nil { return ErrNilContext } - if ctx.Err() != nil { return ctx.Err() } - var firstError error - var try int + // Not sure why these tries are here :( for try = 1; try <= 3; try++ { if ctx.Err() != nil { return ctx.Err() } - ch, chVersion, err := getChannel() + if conn == nil { + conn, err = re.getEventConnection() + if err != nil { + if firstError == nil { + firstError = errors.Wrapf(err, "getEventConnection Failed after try %v", try) + } + continue + } + } + + err := conn.channel.ExchangeDeclare( + name, // name + exchangeType, // type + durable, // durable + autoDelete, // delete when unused + false, // exclusive + false, // no-wait + nil, // args + ) if err != nil { if firstError == nil { - firstError = errors.Wrapf(err, "getChannel Failed after try %v", try) + firstError = errors.Wrapf(err, "ExchangeDeclare Failed after try %v", try) } continue } @@ -194,7 +132,7 @@ func (re *RabbitExchangeImpl) SendTo(name, exchangeType string, durable, autoDel dm = amqp.Persistent } - err = ch.Publish( + err = conn.channel.Publish( name, // exchange key, // routing key false, // mandatory @@ -209,14 +147,11 @@ func (re *RabbitExchangeImpl) SendTo(name, exchangeType string, durable, autoDel if firstError == nil { firstError = errors.Wrapf(err, "ch.Publish Failed after try %v", try) } - replaceChannel(chVersion) + conn = nil continue } - return nil - } - return errors.Wrapf(firstError, "rabbit:Failed to send message after %v tries", try) } } @@ -291,20 +226,14 @@ func (re *RabbitExchangeImpl) getConsumeChannel( queue QueueSettings, retryExchangeName string, ) (*amqp.Channel, <-chan amqp.Delivery, chan *amqp.Error, func(), error) { - conn, err := re.rabbitConsumeConnection.getEventConnection() + conn, err := re.getEventConnection() + defer re.releaseConn(conn) if err != nil { return nil, nil, nil, func() {}, err } - if conn.IsClosed() { - conn, err = re.rabbitConsumeConnection.newEventConnection(conn, re.rabbitIni) - if err != nil { - return nil, nil, nil, func() {}, err - } - } - - ch, err := conn.Channel() + ch := conn.channel if err != nil { return nil, nil, nil, func() {}, err } @@ -708,60 +637,86 @@ func getRequeueQueueName(queueName string, expiry string) string { /* getEventConnection get the connection, creating if not exists */ -func (cm *connectionManager) getEventConnection() (*amqp.Connection, error) { - ec := cm.readEventConnection() - if ec != nil { - return ec, nil +func (re *RabbitExchangeImpl) getEventConnection() (*connectionChannel, error) { + conns := re.getConns() + if conns == nil { + return nil, ErrExchangeClosed + } + select { + case conn := <-conns: + if conn == nil { + return nil, ErrExchangeClosed + } + if conn.connection.IsClosed() { + return re.newEventConnectionAndChannel(re.rabbitIni) + } + return conn, nil + default: + return re.newEventConnectionAndChannel(re.rabbitIni) } - return cm.newEventConnection(ec, cm.rabbitIni) - } -func (cm *connectionManager) readEventConnection() *amqp.Connection { - cm.rabbitConnectionMutex.RLock() - defer cm.rabbitConnectionMutex.RUnlock() - return cm.rabbitConnection +func closeConnectionAndChannel(conn *connectionChannel) error { + err := conn.channel.Close() + if err != nil { + return err + } + return conn.connection.Close() } -/* - newEventConnection creates new connection to rabbit and sets re.rabbitConnection +func (re *RabbitExchangeImpl) releaseConn(conn *connectionChannel) error { + if conn == nil || conn.channel == nil || conn.connection == nil || conn.connection.IsClosed() { + return nil + } - It uses rabbitConnectionConnectTimeout as a semaphore with timeout to prevent many go routines waiting to try to connect. - It still needs to lock rabbitConnectionMutex that is used for faster read access -*/ -func (cm *connectionManager) newEventConnection(old *amqp.Connection, rabbitIni RabbitConfig) (*amqp.Connection, error) { + re.connMu.RLock() + defer re.connMu.RUnlock() + + if re.connectionPool == nil { + return closeConnectionAndChannel(conn) + } + + select { + case re.connectionPool <- conn: + return nil + default: + return closeConnectionAndChannel(conn) + } +} + +func (re *RabbitExchangeImpl) getConns() chan *connectionChannel { + re.connMu.RLock() + conns := re.connectionPool + re.connMu.RUnlock() + return conns +} +func (re *RabbitExchangeImpl) newEventConnectionAndChannel(rabbitIni RabbitConfig) (*connectionChannel, error) { timer := time.NewTimer(rabbitIni.GetConnectTimeout()) defer timer.Stop() + result := make(chan *connectionChannel, 1) + errChan := make(chan error, 1) + go func() { + conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s/", rabbitIni.GetUserName(), rabbitIni.GetPassword(), rabbitIni.GetHost())) + if err != nil { + errChan <- err + return + } + channel, err := conn.Channel() + if err != nil { + errChan <- err + return + } + result <- &connectionChannel{connection: conn, channel: channel} + }() select { - case cm.rabbitConnectionConnectTimeout <- 0: + case conn := <-result: { - defer func() { <-cm.rabbitConnectionConnectTimeout }() - - current := cm.readEventConnection() - if current != old { - return current, nil - } - - cm.rabbitConnectionMutex.Lock() - defer cm.rabbitConnectionMutex.Unlock() - - conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s/", rabbitIni.GetUserName(), rabbitIni.GetPassword(), rabbitIni.GetHost())) - - if err != nil { - return nil, err - } - - go func() { - for blocked := range conn.NotifyBlocked(make(chan amqp.Blocking)) { - if blocked.Active { - log.Printf("rabbit:eventConnection server is blocked because %s", blocked.Reason) - } - } - }() - - cm.rabbitConnection = conn - return cm.rabbitConnection, nil + return conn, nil + } + case err := <-errChan: + { + return nil, err } case <-timer.C: { @@ -770,13 +725,6 @@ func (cm *connectionManager) newEventConnection(old *amqp.Connection, rabbitIni } } -func (cm *connectionManager) closeConnection() error { - if cm.rabbitConnection != nil && !cm.rabbitConnection.IsClosed() { - return cm.rabbitConnection.Close() - } - return nil -} - func Fanout(listen func(MessageHandleFunc) error) (func(MessageHandleFunc) func(), error) { listeners := make(map[int64]MessageHandleFunc) lock := sync.RWMutex{} diff --git a/test/rabbit_events_test.go b/test/rabbit_events_test.go index 696b0d6..8f6e26a 100644 --- a/test/rabbit_events_test.go +++ b/test/rabbit_events_test.go @@ -166,10 +166,10 @@ var _ = Describe("RabbitEvents", func() { ctrl := gomock.NewController(GinkgoT()) defer ctrl.Finish() mockConfig := NewMockRabbitConfig(ctrl) - mockConfig.EXPECT().GetHost().Return("localhost:5672").Times(2) - mockConfig.EXPECT().GetUserName().Return("guest").Times(2) - mockConfig.EXPECT().GetPassword().Return("guest").Times(2) - mockConfig.EXPECT().GetConnectTimeout().Return(time.Second * 5).AnyTimes() + mockConfig.EXPECT().GetHost().Return("localhost:5672") + mockConfig.EXPECT().GetUserName().Return("guest") + mockConfig.EXPECT().GetPassword().Return("guest") + mockConfig.EXPECT().GetConnectTimeout().Return(time.Second * 5) exchange := NewRabbitExchange(mockConfig) handler, closeHandler, err := exchange.ReceiveFrom( @@ -215,9 +215,9 @@ var _ = Describe("RabbitEvents", func() { ctrl := gomock.NewController(GinkgoT()) defer ctrl.Finish() mockConfig := NewMockRabbitConfig(ctrl) - mockConfig.EXPECT().GetHost().Return("localhost:5672").Times(2) - mockConfig.EXPECT().GetUserName().Return("guest").Times(2) - mockConfig.EXPECT().GetPassword().Return("guest").Times(2) + mockConfig.EXPECT().GetHost().Return("localhost:5672") + mockConfig.EXPECT().GetUserName().Return("guest") + mockConfig.EXPECT().GetPassword().Return("guest") mockConfig.EXPECT().GetConnectTimeout().Return(time.Second * 5).AnyTimes() exchange := NewRabbitExchange(mockConfig) @@ -263,9 +263,9 @@ var _ = Describe("RabbitEvents", func() { ctrl := gomock.NewController(GinkgoT()) defer ctrl.Finish() mockConfig := NewMockRabbitConfig(ctrl) - mockConfig.EXPECT().GetHost().Return("localhost:5672").Times(2) - mockConfig.EXPECT().GetUserName().Return("guest").Times(2) - mockConfig.EXPECT().GetPassword().Return("guest").Times(2) + mockConfig.EXPECT().GetHost().Return("localhost:5672") + mockConfig.EXPECT().GetUserName().Return("guest") + mockConfig.EXPECT().GetPassword().Return("guest") mockConfig.EXPECT().GetConnectTimeout().Return(time.Second * 5).AnyTimes() exchange := NewRabbitExchange(mockConfig) @@ -311,9 +311,9 @@ var _ = Describe("RabbitEvents", func() { ctrl := gomock.NewController(GinkgoT()) defer ctrl.Finish() mockConfig := NewMockRabbitConfig(ctrl) - mockConfig.EXPECT().GetHost().Return("localhost:5672").Times(2) - mockConfig.EXPECT().GetUserName().Return("guest").Times(2) - mockConfig.EXPECT().GetPassword().Return("guest").Times(2) + mockConfig.EXPECT().GetHost().Return("localhost:5672") + mockConfig.EXPECT().GetUserName().Return("guest") + mockConfig.EXPECT().GetPassword().Return("guest") mockConfig.EXPECT().GetConnectTimeout().Return(time.Second * 5).AnyTimes() exchange := NewRabbitExchange(mockConfig) @@ -362,9 +362,9 @@ var _ = Describe("RabbitEvents", func() { ctrl := gomock.NewController(GinkgoT()) defer ctrl.Finish() mockConfig := NewMockRabbitConfig(ctrl) - mockConfig.EXPECT().GetHost().Return("localhost:5672").Times(2) - mockConfig.EXPECT().GetUserName().Return("guest").Times(2) - mockConfig.EXPECT().GetPassword().Return("guest").Times(2) + mockConfig.EXPECT().GetHost().Return("localhost:5672") + mockConfig.EXPECT().GetUserName().Return("guest") + mockConfig.EXPECT().GetPassword().Return("guest") mockConfig.EXPECT().GetConnectTimeout().Return(time.Second * 5).AnyTimes() exchange := NewRabbitExchange(mockConfig) @@ -415,9 +415,9 @@ var _ = Describe("RabbitEvents", func() { ctrl := gomock.NewController(GinkgoT()) defer ctrl.Finish() mockConfig := NewMockRabbitConfig(ctrl) - mockConfig.EXPECT().GetHost().Return("localhost:5672").Times(2) - mockConfig.EXPECT().GetUserName().Return("guest").Times(2) - mockConfig.EXPECT().GetPassword().Return("guest").Times(2) + mockConfig.EXPECT().GetHost().Return("localhost:5672") + mockConfig.EXPECT().GetUserName().Return("guest") + mockConfig.EXPECT().GetPassword().Return("guest") mockConfig.EXPECT().GetConnectTimeout().Return(time.Second * 5).AnyTimes() exchange := NewRabbitExchange(mockConfig)