From 88f7248b6d29c7c9068365898b884bd090349ab7 Mon Sep 17 00:00:00 2001 From: benk-gc Date: Thu, 24 Oct 2024 22:00:38 +0100 Subject: [PATCH] Remove dead connections from the pool on connection errors. With the introduction of Yugabyte, there is now the possibility that connections in the pool may become "dead" when new nodes are rotated into a cluster, since they end up pointing at a node which no longer exists. In this case, we can proactively remove these connections from the connection pool when we see the corresponding exceptions. As long as jobs are retryable the work will still eventually be processed - this merely prevents us from being stuck in a pathological case where the pool contains dead connections and nothing is automatically clearing them out (e.g. a deployment killing the pod). --- .rubocop.yml | 4 ++++ lib/que/worker.rb | 14 +++++++++++++- spec/lib/que/worker_spec.rb | 21 +++++++++++++++++++++ 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/.rubocop.yml b/.rubocop.yml index 1758b32..9b4d464 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -25,3 +25,7 @@ RSpec/IndexedLet: RSpec/NestedGroups: Max: 5 + +Sequel/IrreversibleMigration: + Exclude: + - "**/*_spec.rb" diff --git a/lib/que/worker.rb b/lib/que/worker.rb index 2f644ec..af39de4 100644 --- a/lib/que/worker.rb +++ b/lib/que/worker.rb @@ -261,7 +261,9 @@ def work :job_worked end end - rescue PG::Error, Adapters::UnavailableConnection => _e + rescue PG::Error, Adapters::UnavailableConnection => e + remove_dead_connections(e) + # In the event that our Postgres connection is bad, we don't want that error to halt # the work loop. Instead, we should let the work loop sleep and retry. :postgres_error @@ -306,5 +308,15 @@ def actual_job_class_name(class_name, args) class_name end + + def remove_dead_connections(exception) + cause = exception.is_a?(PG::Error) ? exception : exception.cause + + return unless cause.instance_of?(PG::UnableToSend) || cause.instance_of?(PG::ConnectionBad) + + ::ActiveRecord::Base.connection_pool.connections. + find { |conn| conn.owner == ActiveSupport::IsolatedExecutionState.context }. + then { |failed| failed.pool.remove(failed) } + end end end diff --git a/spec/lib/que/worker_spec.rb b/spec/lib/que/worker_spec.rb index 400ec0f..0f2c543 100644 --- a/spec/lib/que/worker_spec.rb +++ b/spec/lib/que/worker_spec.rb @@ -216,6 +216,27 @@ end end + context "when postgres raises a bad connection error while processing a job" do + before do + allow(Que).to receive(:execute). + with(:lock_job, ["default", 0]). + and_raise(PG::ConnectionBad) + end + + it "rescues it and returns an error" do + FakeJob.enqueue(1) + + expect(work).to eq(:postgres_error) + end + + it "removes the connection from the connection pool" do + pool = ActiveRecord::Base.connection_pool + pool.disconnect! && pool.connection + + expect { work }.to change { pool.connections.count }.from(1).to(0) + end + end + context "when we time out checking out a new connection" do it "rescues it and returns an error" do FakeJob.enqueue(1)