Skip to content

Commit

Permalink
Remove dead connections from the pool on connection errors.
Browse files Browse the repository at this point in the history
With the introduction of Yugabyte, there is now the possibility that
connections in the pool may become "dead" when new nodes are rotated
into a cluster, since they end up pointing at a node which no longer
exists. In this case, we can proactively remove these connections from
the connection pool when we see the corresponding exceptions.

As long as jobs are retryable the work will still eventually be
processed - this merely prevents us from being stuck in a pathological
case where the pool contains dead connections and nothing is
automatically clearing them out (e.g. a deployment killing the pod).
  • Loading branch information
benk-gc committed Oct 25, 2024
1 parent 5b3d374 commit 88f7248
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 1 deletion.
4 changes: 4 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ RSpec/IndexedLet:

RSpec/NestedGroups:
Max: 5

Sequel/IrreversibleMigration:
Exclude:
- "**/*_spec.rb"
14 changes: 13 additions & 1 deletion lib/que/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,9 @@ def work
:job_worked
end
end
rescue PG::Error, Adapters::UnavailableConnection => _e
rescue PG::Error, Adapters::UnavailableConnection => e
remove_dead_connections(e)

# In the event that our Postgres connection is bad, we don't want that error to halt
# the work loop. Instead, we should let the work loop sleep and retry.
:postgres_error
Expand Down Expand Up @@ -306,5 +308,15 @@ def actual_job_class_name(class_name, args)

class_name
end

def remove_dead_connections(exception)
cause = exception.is_a?(PG::Error) ? exception : exception.cause

return unless cause.instance_of?(PG::UnableToSend) || cause.instance_of?(PG::ConnectionBad)

::ActiveRecord::Base.connection_pool.connections.
find { |conn| conn.owner == ActiveSupport::IsolatedExecutionState.context }.
then { |failed| failed.pool.remove(failed) }
end
end
end
21 changes: 21 additions & 0 deletions spec/lib/que/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,27 @@
end
end

context "when postgres raises a bad connection error while processing a job" do
before do
allow(Que).to receive(:execute).
with(:lock_job, ["default", 0]).
and_raise(PG::ConnectionBad)
end

it "rescues it and returns an error" do
FakeJob.enqueue(1)

expect(work).to eq(:postgres_error)
end

it "removes the connection from the connection pool" do
pool = ActiveRecord::Base.connection_pool
pool.disconnect! && pool.connection

expect { work }.to change { pool.connections.count }.from(1).to(0)
end
end

context "when we time out checking out a new connection" do
it "rescues it and returns an error" do
FakeJob.enqueue(1)
Expand Down

0 comments on commit 88f7248

Please sign in to comment.