diff --git a/go/pools/smartconnpool/pool.go b/go/pools/smartconnpool/pool.go index d49032f34a1..91b90c3772f 100644 --- a/go/pools/smartconnpool/pool.go +++ b/go/pools/smartconnpool/pool.go @@ -92,6 +92,7 @@ type RefreshCheck func() (bool, error) type Config[C Connection] struct { Capacity int64 + MaxIdleCount int64 IdleTimeout time.Duration MaxLifetime time.Duration RefreshInterval time.Duration @@ -123,6 +124,8 @@ type ConnPool[C Connection] struct { active atomic.Int64 // capacity is the maximum number of connections that this pool can open capacity atomic.Int64 + // maxIdleCount is the maximum idle connections in the pool + idleCount atomic.Int64 // workers is a waitgroup for all the currently running worker goroutines workers sync.WaitGroup @@ -138,6 +141,8 @@ type ConnPool[C Connection] struct { // maxCapacity is the maximum value to which capacity can be set; when the pool // is re-opened, it defaults to this capacity maxCapacity int64 + // maxIdleCount is the maximum idle connections in the pool + maxIdleCount int64 // maxLifetime is the maximum time a connection can be open maxLifetime atomic.Int64 // idleTimeout is the maximum time a connection can remain idle @@ -158,6 +163,7 @@ func NewPool[C Connection](config *Config[C]) *ConnPool[C] { pool := &ConnPool[C]{} pool.freshSettingsStack.Store(-1) pool.config.maxCapacity = config.Capacity + pool.config.maxIdleCount = config.MaxIdleCount pool.config.maxLifetime.Store(config.MaxLifetime.Nanoseconds()) pool.config.idleTimeout.Store(config.IdleTimeout.Nanoseconds()) pool.config.refreshInterval.Store(config.RefreshInterval.Nanoseconds()) @@ -192,6 +198,7 @@ func (pool *ConnPool[C]) runWorker(close <-chan struct{}, interval time.Duration func (pool *ConnPool[C]) open() { pool.close = make(chan struct{}) pool.capacity.Store(pool.config.maxCapacity) + pool.setIdleCount() // The expire worker takes care of removing from the waiter list any clients whose // context has been cancelled. @@ -315,6 +322,16 @@ func (pool *ConnPool[C]) MaxCapacity() int64 { return pool.config.maxCapacity } +func (pool *ConnPool[C]) setIdleCount() { + capacity := pool.Capacity() + maxIdleCount := pool.config.maxIdleCount + if maxIdleCount == 0 || maxIdleCount > capacity { + pool.idleCount.Store(capacity) + } else { + pool.idleCount.Store(maxIdleCount) + } +} + // InUse returns the number of connections that the pool has lent out to clients and that // haven't been returned yet. func (pool *ConnPool[C]) InUse() int64 { @@ -396,6 +413,17 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) { } if !pool.wait.tryReturnConn(conn) { + // Option 1: do not care if more connections are closed than the allowed idle count + if pool.active.Load() > pool.idleCount.Load() { + conn.Close() + pool.closedConn() + return + } + // Option 2: precisely maintain the idle count + if pool.tryClose(conn) { + return + } + connSetting := conn.Conn.Setting() if connSetting == nil { pool.clean.Push(conn) @@ -407,6 +435,19 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) { } } +func (pool *ConnPool[C]) tryClose(conn *Pooled[C]) bool { + for { + open := pool.active.Load() + if open <= pool.idleCount.Load() { + return false + } + if pool.active.CompareAndSwap(open, open-1) { + conn.Close() + return true + } + } +} + func (pool *ConnPool[D]) extendedMaxLifetime() time.Duration { maxLifetime := pool.config.maxLifetime.Load() if maxLifetime == 0 { @@ -629,6 +670,7 @@ func (pool *ConnPool[C]) setCapacity(ctx context.Context, newcap int64) error { if oldcap == newcap { return nil } + pool.setIdleCount() const delay = 10 * time.Millisecond