From fda3767b4ea26c70d8f71cc6253ce1ab3b7f686f Mon Sep 17 00:00:00 2001 From: Jeff Doering Date: Fri, 3 May 2019 13:47:47 -0700 Subject: [PATCH] New feature to track query blockers. --- guides/Linux.md | 10 +- guides/Rails.md | 6 + lib/pghero.rb | 22 +- lib/pghero/base_database.rb | 15 ++ lib/pghero/connection.rb | 5 - lib/pghero/database.rb | 24 +- lib/pghero/methods/basic.rb | 188 ++++++++++----- lib/pghero/methods/connections.rb | 18 +- lib/pghero/methods/maintenance.rb | 2 +- lib/pghero/methods/queries.rb | 2 +- lib/pghero/methods/query_blockers.rb | 234 +++++++++++++++++++ lib/pghero/methods/query_blockers_history.rb | 98 ++++++++ lib/pghero/methods/query_stats.rb | 12 +- lib/pghero/methods/replication.rb | 4 +- lib/pghero/methods/repository.rb | 79 +++++++ lib/pghero/methods/settings.rb | 2 +- lib/pghero/methods/space.rb | 8 +- lib/pghero/methods/type_const.rb | 66 ++++++ lib/pghero/pg_const.rb | 8 + lib/pghero/query_stats.rb | 7 - lib/pghero/repository.rb | 27 +++ lib/pghero/version.rb | 2 + lib/tasks/pghero.rake | 12 +- test/basic_test.rb | 2 +- test/capture_test.rb | 30 ++- test/query_blockers_test.rb | 54 +++++ test/test_helper.rb | 13 ++ 27 files changed, 837 insertions(+), 113 deletions(-) create mode 100644 lib/pghero/base_database.rb delete mode 100644 lib/pghero/connection.rb create mode 100644 lib/pghero/methods/query_blockers.rb create mode 100644 lib/pghero/methods/query_blockers_history.rb create mode 100644 lib/pghero/methods/repository.rb create mode 100644 lib/pghero/methods/type_const.rb create mode 100644 lib/pghero/pg_const.rb delete mode 100644 lib/pghero/query_stats.rb create mode 100644 lib/pghero/repository.rb create mode 100644 test/query_blockers_test.rb diff --git a/guides/Linux.md b/guides/Linux.md index f9ccf34c5..05edc1f43 100644 --- a/guides/Linux.md +++ b/guides/Linux.md @@ -225,7 +225,7 @@ CREATE TABLE "pghero_connection_stats" ( "username" text, "captured_at" timestamp ); -CREATE INDEX "pghero_connection_stats" ("database", "captured_at"); +CREATE INDEX ON "pghero_connection_stats" ("database", "captured_at"); ``` Schedule the task below to run once a day. @@ -234,6 +234,14 @@ Schedule the task below to run once a day. sudo pghero run rake pghero:capture_connection_stats ``` +## Historical Query Blockers + +To track query blockers over time, create a table to store them. + +TODO +```sql +CREATE TABLE ... +``` ## System Stats diff --git a/guides/Rails.md b/guides/Rails.md index 8cebf9662..ec5afef0c 100644 --- a/guides/Rails.md +++ b/guides/Rails.md @@ -303,6 +303,12 @@ PgHero.drop_user("ganondorf") ## Upgrading +### 2.x.y + +New features + +Sample blocker queries - TODO + ### 2.0.0 New features diff --git a/lib/pghero.rb b/lib/pghero.rb index eef93b571..1b71a03e9 100644 --- a/lib/pghero.rb +++ b/lib/pghero.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + # dependencies require "active_support" @@ -9,8 +11,11 @@ require "pghero/methods/kill" require "pghero/methods/maintenance" require "pghero/methods/queries" +require "pghero/methods/query_blockers" +require "pghero/methods/query_blockers_history" require "pghero/methods/query_stats" require "pghero/methods/replication" +require "pghero/methods/repository" require "pghero/methods/sequences" require "pghero/methods/settings" require "pghero/methods/space" @@ -19,13 +24,14 @@ require "pghero/methods/tables" require "pghero/methods/users" +require "pghero/base_database" require "pghero/database" require "pghero/engine" if defined?(Rails) +require "pghero/pg_const" +require "pghero/repository" require "pghero/version" module PgHero - autoload :Connection, "pghero/connection" - autoload :QueryStats, "pghero/query_stats" class Error < StandardError; end class NotEnabled < Error; end @@ -55,7 +61,7 @@ class << self :query_stats_available?, :query_stats_enabled?, :query_stats_extension_enabled?, :query_stats_readable?, :rds_stats, :read_iops_stats, :region, :relation_sizes, :replica?, :replication_lag, :replication_lag_stats, :reset_query_stats, :reset_stats, :running_queries, :secret_access_key, :sequence_danger, :sequences, :settings, - :slow_queries, :space_growth, :ssl_used?, :stats_connection, :suggested_indexes, :suggested_indexes_by_query, + :slow_queries, :space_growth, :ssl_used?, :suggested_indexes, :suggested_indexes_by_query, :suggested_indexes_enabled?, :system_stats_enabled?, :table_caching, :table_hit_rate, :table_stats, :total_connections, :transaction_id_danger, :unused_indexes, :unused_tables, :write_iops_stats @@ -100,9 +106,10 @@ def config def databases @databases ||= begin + repository = PgHero::Repository.new Hash[ config["databases"].map do |id, c| - [id.to_sym, PgHero::Database.new(id, c)] + [id.to_sym, PgHero::Database.new(id, c, repository)] end ] end @@ -115,6 +122,7 @@ def primary_database def capture_query_stats(verbose: false) each_database do |database| next unless database.capture_query_stats? + puts "Capturing query stats for #{database.id}..." if verbose database.capture_query_stats(raise_errors: true) end @@ -138,15 +146,15 @@ def capture_query_blockers(verbose: false) each_database do |database| next unless database.capture_query_blockers? - puts "(Simulating) Capturing query blockers for #{database.id}..." if verbose - # TODO: actually implement database.capture_query_blockers + puts "Capturing query blockers for #{database.id}..." if verbose + database.capture_query_blockers end - true end def analyze_all(**options) each_database do |database| next if database.replica? + database.analyze_tables(**options) end end diff --git a/lib/pghero/base_database.rb b/lib/pghero/base_database.rb new file mode 100644 index 000000000..acafecade --- /dev/null +++ b/lib/pghero/base_database.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +module PgHero + class BaseDatabase + + include Methods::Basic + + # Subclasses must define connection_model returning and ActiveRecord model + # for connection management + + def connection + connection_model.connection + end + end +end \ No newline at end of file diff --git a/lib/pghero/connection.rb b/lib/pghero/connection.rb deleted file mode 100644 index 04d398715..000000000 --- a/lib/pghero/connection.rb +++ /dev/null @@ -1,5 +0,0 @@ -module PgHero - class Connection < ActiveRecord::Base - self.abstract_class = true - end -end diff --git a/lib/pghero/database.rb b/lib/pghero/database.rb index aa925b81a..ed86630b3 100644 --- a/lib/pghero/database.rb +++ b/lib/pghero/database.rb @@ -1,12 +1,17 @@ +# frozen_string_literal: true + +require "active_record" + module PgHero - class Database - include Methods::Basic + class Database < BaseDatabase + include Methods::Connections include Methods::Explain include Methods::Indexes include Methods::Kill include Methods::Maintenance include Methods::Queries + include Methods::QueryBlockers include Methods::QueryStats include Methods::Replication include Methods::Sequences @@ -19,17 +24,18 @@ class Database attr_reader :id, :config - def initialize(id, config) + def initialize(id, config, repository) @id = id @config = config || {} + @repository = repository end def name - @name ||= @config["name"] || id.titleize + @name ||= config["name"] || id.titleize end def db_instance_identifier - @db_instance_identifier ||= @config["db_instance_identifier"] + @db_instance_identifier ||= config["db_instance_identifier"] end def capture_query_stats? @@ -70,10 +76,12 @@ def index_bloat_bytes private + attr_reader :repository + def connection_model @connection_model ||= begin url = config["url"] - Class.new(PgHero::Connection) do + Class.new(Connection) do def self.name "PgHero::Connection::Database#{object_id}" end @@ -87,5 +95,9 @@ def self.name end end end + + class Connection < ActiveRecord::Base + self.abstract_class = true + end end end diff --git a/lib/pghero/methods/basic.rb b/lib/pghero/methods/basic.rb index 8b33555ae..a8561f0bd 100644 --- a/lib/pghero/methods/basic.rb +++ b/lib/pghero/methods/basic.rb @@ -1,48 +1,93 @@ +# frozen_string_literal: true + +require 'active_record' + module PgHero module Methods module Basic + + PG_CONNECTION_ADAPTER_NAMES = %i[postgresql postgis].freeze + + ACTIVE_RECORD_CAST_METHOD = ActiveRecord::VERSION::MAJOR < 5 ? :type_cast : :cast + + private_constant :PG_CONNECTION_ADAPTER_NAMES, :ACTIVE_RECORD_CAST_METHOD + + # from ActiveSupport + def self.squish(str) + str.to_s.gsub(/\A[[:space:]]+/, '').gsub(/[[:space:]]+\z/, '').gsub(/[[:space:]]+/, ' ') + end + + def self.remove_line_comments(sql) + sql.gsub(/[\s]*--[^\r\n]*/, '') + end + + def self.sql_const(sql) + make_squishable(sql, squish(remove_line_comments(sql))) + end + + def self.make_squishable(obj, squished, freeze = true) + squished = squished.freeze if freeze + obj = obj.frozen? ? obj.dup : obj + obj.define_singleton_method(:squish, -> { squished }) + freeze ? obj.freeze : obj + end + + def execute(sql) + connection.execute(sql) + end + + def select_one(sql) + select_all(sql).first.values.first + end + + def quote(value) + connection.quote(value) + end + def ssl_used? ssl_used = nil with_transaction(rollback: true) do begin - execute("CREATE EXTENSION IF NOT EXISTS sslinfo") + execute('CREATE EXTENSION IF NOT EXISTS sslinfo') rescue ActiveRecord::StatementInvalid # not superuser end - ssl_used = select_one("SELECT ssl_is_used()") + ssl_used = select_one('SELECT ssl_is_used()') end ssl_used end def database_name - select_one("SELECT current_database()") + select_one('SELECT current_database()') end def server_version - @server_version ||= select_one("SHOW server_version") + @server_version ||= select_one('SHOW server_version') end def server_version_num - @server_version_num ||= select_one("SHOW server_version_num").to_i + @server_version_num ||= select_one('SHOW server_version_num').to_i end def quote_ident(value) quote_table_name(value) end - private - - def select_all(sql, conn = nil) - conn ||= connection + def select_all(sql) # squish for logs retries = 0 begin - result = conn.select_all(squish(sql)) - cast_method = ActiveRecord::VERSION::MAJOR < 5 ? :type_cast : :cast_value - result.map { |row| Hash[row.map { |col, val| [col.to_sym, result.column_types[col].send(cast_method, val)] }] } + result = connection.select_all(sql.respond_to?(:squish) ? sql.squish : squish(sql)) + result.map do |row| + Hash[ + row.map do |col, val| + [col.to_sym, result.column_types[col].send(ACTIVE_RECORD_CAST_METHOD, val)] + end + ] + end rescue ActiveRecord::StatementInvalid => e # fix for random internal errors - if e.message.include?("PG::InternalError") && retries < 2 + if e.message.include?('PG::InternalError') && retries < 2 retries += 1 sleep(0.1) retry @@ -52,10 +97,6 @@ def select_all(sql, conn = nil) end end - def select_all_stats(sql) - select_all(sql, stats_connection) - end - def select_all_size(sql) result = select_all(sql) result.each do |row| @@ -64,35 +105,12 @@ def select_all_size(sql) result end - def select_one(sql, conn = nil) - select_all(sql, conn).first.values.first + def select_one(sql) + select_all(sql).first.values.first end - def select_one_stats(sql) - select_one(sql, stats_connection) - end - - def execute(sql) - connection.execute(sql) - end - - def connection - connection_model.connection - end - - def stats_connection - ::PgHero::QueryStats.connection - end - - def insert_stats(table, columns, values) - values = values.map { |v| "(#{v.map { |v2| quote(v2) }.join(",")})" }.join(",") - columns = columns.map { |v| quote_table_name(v) }.join(",") - stats_connection.execute("INSERT INTO #{quote_table_name(table)} (#{columns}) VALUES #{values}") - end - - # from ActiveSupport def squish(str) - str.to_s.gsub(/\A[[:space:]]+/, "").gsub(/[[:space:]]+\z/, "").gsub(/[[:space:]]+/, " ") + Basic.squish(str) end def quote(value) @@ -121,22 +139,80 @@ def with_transaction(lock_timeout: nil, statement_timeout: nil, rollback: false) end def table_exists?(table) - ["PostgreSQL", "PostGIS"].include?(stats_connection.adapter_name) && - select_one_stats(<<-SQL - SELECT EXISTS ( + postgres_connection? && + select_one(<<-SQL) + SELECT EXISTS ( + #{table_exists_subquery(quote(table))} + ) + SQL + end + + def missing_tables(*tables) + return tables unless postgres_connection? + + result = select_all(<<-SQL) SELECT - 1 + table_name FROM - pg_catalog.pg_class c - INNER JOIN - pg_catalog.pg_namespace n ON n.oid = c.relnamespace - WHERE - n.nspname = 'public' - AND c.relname = #{quote(table)} - AND c.relkind = 'r' - ) + UNNEST(ARRAY[#{tables.map { |table| "'#{table}'" }.join(', ')}]::varchar[]) table_name + WHERE NOT EXISTS ( + #{table_exists_subquery('table_name')} + ) SQL - ) + + result.map { |row| row[:table_name] } + end + + def postgres_connection? + PG_CONNECTION_ADAPTER_NAMES.include?(connection_adapter_name) + end + + private + + def table_exists_subquery(quoted_relname) + # quoted_relname must be pre-quoted if it's a literal + <<-SQL + SELECT + 1 + FROM + pg_catalog.pg_class c + INNER JOIN + pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE + n.nspname = 'public' + AND c.relname = #{quoted_relname} + AND c.relkind = 'r' + SQL + end + + def quote_column_names(col_names) + connection = self.connection + col_names.map do |col_name| + connection.quote_table_name(col_name) + end + end + + def quote_row_values(row_values) + row_values.map do |value| + connection.quote(value) + end + end + + def quote_typed_column_names(typed_columns) + connection = self.connection + typed_columns.map { |col| col.quote_name(connection) } + end + + def quote_typed_row_values(typed_columns, row_values) + connection = self.connection + typed_columns.map.with_index do |typed_col, idx| + typed_col.quote_value(connection, row_values[idx]) + end + end + + # From ActiveRecord::Type - compatible with ActiveRecord::Type.lookup + def connection_adapter_name + connection.adapter_name.downcase.to_sym end end end diff --git a/lib/pghero/methods/connections.rb b/lib/pghero/methods/connections.rb index e6ae86120..d83fdc330 100644 --- a/lib/pghero/methods/connections.rb +++ b/lib/pghero/methods/connections.rb @@ -1,6 +1,12 @@ +# frozen_string_literal: true + module PgHero module Methods module Connections + + CONNECTION_STATS_TABLE = 'pghero_connection_stats' + private_constant :CONNECTION_STATS_TABLE + def total_connections select_one("SELECT COUNT(*) FROM pg_stat_activity") end @@ -54,9 +60,9 @@ def connection_sources_by_user end def recently_connected_users - users = select_all_stats <<-SQL + users = repository.select_all <<-SQL SELECT distinct username - FROM "pghero_connection_stats" + FROM "pghero_connection_stats" WHERE database = #{quote(id)} AND captured_at > date_trunc('day', NOW() - interval '2 hours') ORDER by username @@ -64,7 +70,7 @@ def recently_connected_users end def connection_history_for_user(username) - history = select_all_stats <<-SQL + history = repository.select_all <<-SQL SELECT date_trunc('minute', captured_at) as the_date, max(total_connections) as tot FROM "pghero_connection_stats" WHERE database= #{quote(id)} @@ -82,11 +88,11 @@ def capture_connection_stats connection_sources_by_user.each do |rs| values << [id, rs[:total_connections].to_i,rs[:user], now] end - insert_stats("pghero_connection_stats", columns, values) if values.any? + repository.insert(CONNECTION_STATS_TABLE, columns, values) if values.any? end - + def connection_stats_enabled? - table_exists?("pghero_connection_stats") + repository.table_exists?(CONNECTION_STATS_TABLE) end end diff --git a/lib/pghero/methods/maintenance.rb b/lib/pghero/methods/maintenance.rb index bc315db86..aadc97acf 100644 --- a/lib/pghero/methods/maintenance.rb +++ b/lib/pghero/methods/maintenance.rb @@ -34,7 +34,7 @@ def autovacuum_danger end def vacuum_progress - if server_version_num >= 90600 + if server_version_num >= PgConst::VERSION_9_6 select_all <<-SQL SELECT pid, diff --git a/lib/pghero/methods/queries.rb b/lib/pghero/methods/queries.rb index f035e4e40..7069f1723 100644 --- a/lib/pghero/methods/queries.rb +++ b/lib/pghero/methods/queries.rb @@ -8,7 +8,7 @@ def running_queries(min_duration: nil, all: false) state, application_name AS source, age(NOW(), COALESCE(query_start, xact_start)) AS duration, - #{server_version_num >= 90600 ? "(wait_event IS NOT NULL) AS waiting" : "waiting"}, + #{server_version_num >= PgConst::VERSION_9_6 ? "(wait_event IS NOT NULL) AS waiting" : "waiting"}, query, COALESCE(query_start, xact_start) AS started_at, EXTRACT(EPOCH FROM NOW() - COALESCE(query_start, xact_start)) * 1000.0 AS duration_ms, diff --git a/lib/pghero/methods/query_blockers.rb b/lib/pghero/methods/query_blockers.rb new file mode 100644 index 000000000..4bad25aa6 --- /dev/null +++ b/lib/pghero/methods/query_blockers.rb @@ -0,0 +1,234 @@ +# frozen_string_literal: true + +require_relative 'type_const' + +module PgHero + module Methods + module QueryBlockers + + def supports_query_blocker_monitoring? + supports_pg_blocking_pids? + end + + def capture_query_blockers? + config['capture_query_blockers'] != false + end + + def capture_query_blockers(raise_errors: false, save_empty_samples: true) + return unless capture_query_blockers? + + sample_set = sample_query_blockers + if !sample_set.sessions.empty? || save_empty_samples + repository.insert_query_blockers(sample_set, raise_errors: raise_errors) + end + sample_set + end + + def sample_query_blockers + raise NotEnabled, 'Query blockers requires Postgres 9.6+ support for pg_blocking_pids' unless supports_pg_blocking_pids? + + SampleSet.new(self) + end + + class SampleSet + # Do transforms (both normalization and de-normalization) + # to make the data easier for later analysis here rather + # than in SQL to minimize the cost on the monitored DB + # and the complexity of the (already complicated) query + # used for atomic data collection + + BLOCKER_QUERY_COLUMNS = + TypeConst.const_column_list( + pid: TypeConst::INTEGER, + user: TypeConst::TEXT, + source: TypeConst::TEXT, + client_addr: TypeConst::INET, + client_hostname: TypeConst::TEXT, + client_port: TypeConst::INTEGER, + backend_start: TypeConst::DATETIME, + xact_start: TypeConst::DATETIME, + query_start: TypeConst::DATETIME, + state_change: TypeConst::TEXT, + wait_event_type: TypeConst::TEXT, + wait_event: TypeConst::TEXT, + state: TypeConst::TEXT, + backend_xid: TypeConst::XID, + backend_xmin: TypeConst::XID, + query: TypeConst::TEXT, + backend_type: TypeConst::TEXT, + blocked_by: TypeConst::INTEGER_ARRAY + ) + + BLOCKER_QUERY_COLUMN_NAMES = + BLOCKER_QUERY_COLUMNS.map(&:name).freeze + + private_constant :BLOCKER_QUERY_COLUMNS, :BLOCKER_QUERY_COLUMN_NAMES + + BLOCKER_ATTRIBUTE_COLUMNS = + (BLOCKER_QUERY_COLUMNS + TypeConst.const_column_list(blocking: TypeConst::INTEGER_ARRAY)).freeze + + + attr_reader :captured_at, :database, :txid_xmin, :txid_xmax, :txid_xip, :sessions + attr_accessor :id + + def initialize(database) + records = database.select_all BLOCKER_SAMPLE_SET_SQL + first_record = records.first # Encodes whether the set has any real blockers + + @captured_at = first_record[:sample_captured_at] + @database = first_record[:sample_database] + @txid_xmin = first_record[:sample_txid_xmin] + @txid_xmax = first_record[:sample_txid_xmax] + @txid_xip = first_record[:sample_txid_xip] + + @sessions = first_record[:pid] ? rows_to_sessions(records) : {} + end + + private + + def rows_to_sessions(result) + session_cache = {} + + result.map do |row| + current_pid = row[:pid] + session = row.slice(*BLOCKER_QUERY_COLUMN_NAMES) + + # Might have already encountered earlier blockees + session[:blocking] = session_cache[current_pid]&.[](:blocking) + session_cache[current_pid] = session + + session[:blocked_by]&.each do |blocker_pid| + add_blockee(current_pid, blocker_pid, session_cache) + end + session + end + end + + def add_blockee(blockee_pid, blocker_pid, session_cache) + blocker_session = (session_cache[blocker_pid] ||= {}) + (blocker_session[:blocking] ||= []).push(blockee_pid) + end + + # Include inline SQL comments to document nuances of the query + # here (they execute fine); but they break internal quoting logic + # (that removes newlines) so strip them out for runtime use + BLOCKER_SAMPLE_SET_SQL = Basic.sql_const(<<-SQL) + WITH blocked_pids AS ( + -- Pids of all sessions with blockers + SELECT + pid blocked_pid, + pg_blocking_pids(pid) AS blocked_by + FROM + pg_stat_activity + WHERE + CARDINALITY(pg_blocking_pids(pid)) > 0), + + blockers_and_blockees as ( + -- Details of all blockers and blockees; grab almost + -- everything since catching blockers via sampling + -- is hit and miss so forensic details are valuable + SELECT + psa.pid pid, + usename, + application_name, + client_addr, + client_hostname, + client_port, + backend_start, + xact_start, + query_start, + state_change, + wait_event_type, + wait_event, + state, + backend_xid, + backend_xmin, + query, + backend_type, + bp.blocked_by + FROM + pg_stat_activity psa + LEFT OUTER JOIN -- allows matching blockers as well as blockees + blocked_pids bp + ON + psa.pid = bp.blocked_pid -- normal join matches blockees + WHERE + datname = current_database() + AND ( + bp.blocked_pid IS NOT NULL -- blockees that already matched JOIN ON + OR EXISTS -- adds blockers that are not also blockees + (SELECT * FROM blocked_pids bp2 WHERE psa.pid = ANY(bp2.blocked_by)) + ) + ), + + sample_set_header as ( + -- Details to record a sample set + -- even if there were no blockers + SELECT + current_database() sample_database, + NOW() sample_captured_at, + -- Include txid snapshot details so that txid epoch for backend_xid and backend_xmin + -- can be inferred; do not compare these directly to backend values without + -- accounting for epoch adjustment + txid_snapshot_xmin(txid_current_snapshot()) sample_txid_xmin, + txid_snapshot_xmax(txid_current_snapshot()) sample_txid_xmax, + ARRAY(SELECT txid_snapshot_xip(txid_current_snapshot())) sample_txid_xip + ) + + -- Sample set always return at least one row + -- including the timestamp and database + -- clients should check for the special case + -- of one row with a null pid meaning there were + -- no blockers or blockees in the sample set + SELECT + header.sample_database, + header.sample_captured_at, + header.sample_txid_xmin, + header.sample_txid_xmax, + header.sample_txid_xip, + bab.pid, + bab.usename::text "user", + bab.application_name source, + bab.client_addr, + bab.client_hostname, + bab.client_port, + bab.backend_start, + bab.xact_start, + bab.query_start, + bab.state_change, + bab.wait_event_type, + bab.wait_event, + bab.state, + bab.backend_xid::text::bigint, -- careful, wraps around, 32-bit unsigned value, no epoch + bab.backend_xmin::text::bigint, -- careful, wraps around, 32-bit unsigned value, no epoch + bab.query, + bab.backend_type, + bab.blocked_by + FROM + sample_set_header header + LEFT OUTER JOIN + blockers_and_blockees bab + ON TRUE + ORDER BY bab.pid + SQL + + private_constant :BLOCKER_SAMPLE_SET_SQL + end + + private + + def supports_pg_blocking_pids? + # pg_blocking_pids introduced in Postgresql 9.6 + # Release Note: https://www.postgresql.org/docs/9.6/release-9-6.html#AEN133618 + # Current Doc: https://www.postgresql.org/docs/current/functions-info.html#FUNCTIONS-INFO-SESSION-TABLE + # + # Previously complex queries on pg_locks were widely used but they are both convoluted and inferior so this + # feature is based only on pg_blocking_pids given that Postgres 9.6 has been available since Sept. 2016. + # + # Technical details on the improvements pg_blocking_pids introduced: + # https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=52f5d578d6c29bf254e93c69043b817d4047ca67 + server_version_num >= PgConst::VERSION_9_6 + end + end + end +end diff --git a/lib/pghero/methods/query_blockers_history.rb b/lib/pghero/methods/query_blockers_history.rb new file mode 100644 index 000000000..6bb08b8fd --- /dev/null +++ b/lib/pghero/methods/query_blockers_history.rb @@ -0,0 +1,98 @@ +# frozen_string_literal: true + +require_relative 'type_const' + +module PgHero + module Methods + module QueryBlockersHistory + + BLOCKER_SAMPLE_TABLE = 'pghero_blocker_samples' + + BLOCKER_SAMPLE_SESSION_TABLE = 'pghero_blocker_sample_sessions' + + ID_COL_LIST = TypeConst.const_column_list(id: TypeConst::BIGINT) + + INSERT_BLOCKER_SAMPLE_COLS = + TypeConst.const_column_list( + database: TypeConst::TEXT, + captured_at: TypeConst::DATETIME, + txid_xmin: TypeConst::BIGINT, + txid_xmax: TypeConst::BIGINT, + txid_xip: TypeConst::BIGINT_ARRAY + ) + + INSERT_BLOCKER_SAMPLE_SESSION_COLS = ( + TypeConst.const_column_list(blocker_sample_id: TypeConst::BIGINT) + + QueryBlockers::SampleSet::BLOCKER_ATTRIBUTE_COLUMNS + ).freeze + + private_constant :BLOCKER_SAMPLE_TABLE, + :BLOCKER_SAMPLE_SESSION_TABLE, + :ID_COL_LIST, + :INSERT_BLOCKER_SAMPLE_COLS, + :INSERT_BLOCKER_SAMPLE_SESSION_COLS + + def supports_query_blocker_history?(raise_on_unsupported: false) + return @blockers_tables_usable if @blockers_tables_usable + + missing_tables = self.missing_tables(BLOCKER_SAMPLE_TABLE, BLOCKER_SAMPLE_SESSION_TABLE) + @blocker_tables_usable = missing_tables.empty? + + if !@blocker_tables_usable && raise_on_unsupported + raise NotEnabled, "Missing table(s): #{missing_tables.join(', ')} are required to track blocker history" + end + + @blocker_tables_usable + end + + def insert_query_blockers(sample_set, raise_errors: false) + return unless supports_query_blocker_history?(raise_on_unsupported: raise_errors) + + with_transaction do # Might already be in a transaction; that's fine + sample_set.id = insert_query_blocker_sample(sample_set) + unless sample_set.sessions.empty? + # Maximum 1K records at a time to keep the SQL INSERT string "reasonable" + sample_set.sessions.each_slice(1000) do |session_batch| + insert_session_batch(sample_set.id, session_batch) + end + end + end + sample_set.id + end + + # TODO: add support for querying historical data + + private + + def insert_query_blocker_sample(sample_set) + result = insert_typed(BLOCKER_SAMPLE_TABLE, + INSERT_BLOCKER_SAMPLE_COLS, + [sample_values(sample_set)], + typed_return_cols: ID_COL_LIST) + result.first[:id.to_s] + end + + def insert_session_batch(sample_id, session_batch) + result = insert_typed(BLOCKER_SAMPLE_SESSION_TABLE, + INSERT_BLOCKER_SAMPLE_SESSION_COLS, + session_values(sample_id, session_batch), + typed_return_cols: ID_COL_LIST) + session_batch.each.with_index do |session, idx| + session[:id] = result[idx][:id.to_s] + end + end + + def sample_values(sample_set) + INSERT_BLOCKER_SAMPLE_COLS.map { |col| sample_set.send(col.name) } + end + + def session_values(sample_id, session_batch) + session_batch.map do |session| + INSERT_BLOCKER_SAMPLE_SESSION_COLS.map.with_index do |col, idx| + col.name == :blocker_sample_id ? sample_id : session[col.name] + end + end + end + end + end +end diff --git a/lib/pghero/methods/query_stats.rb b/lib/pghero/methods/query_stats.rb index a6b5c9431..0fc85ef72 100644 --- a/lib/pghero/methods/query_stats.rb +++ b/lib/pghero/methods/query_stats.rb @@ -72,15 +72,15 @@ def historical_query_stats_enabled? end def query_stats_table_exists? - table_exists?("pghero_query_stats") + repository.table_exists?("pghero_query_stats") end def missing_query_stats_columns - %w(query_hash user) - PgHero::QueryStats.column_names + %w(query_hash user) - PgHero::Repository::QueryStats.column_names end def supports_query_hash? - server_version_num >= 90400 + server_version_num >= PgConst::VERSION_9_4 end # resetting query stats will reset across the entire Postgres instance @@ -123,7 +123,7 @@ def capture_query_stats(raise_errors: false) end columns = %w[database query total_time calls captured_at query_hash user] - insert_stats("pghero_query_stats", columns, values) + repository.insert("pghero_query_stats", columns, values) end end end @@ -137,7 +137,7 @@ def slow_queries(query_stats: nil, **options) def query_hash_stats(query_hash, user: nil) if historical_query_stats_enabled? && supports_query_hash? start_at = 24.hours.ago - select_all_stats <<-SQL + repository.select_all <<-SQL SELECT captured_at, total_time / 1000 / 60 AS total_minutes, @@ -208,7 +208,7 @@ def current_query_stats(limit: nil, sort: nil, database: nil, query_hash: nil) def historical_query_stats(sort: nil, start_at: nil, end_at: nil, query_hash: nil) if historical_query_stats_enabled? sort ||= "total_minutes" - select_all_stats <<-SQL + repository.select_all <<-SQL WITH query_stats AS ( SELECT #{supports_query_hash? ? "query_hash" : "md5(query)"} AS query_hash, diff --git a/lib/pghero/methods/replication.rb b/lib/pghero/methods/replication.rb index f1da63278..f5f4de32a 100644 --- a/lib/pghero/methods/replication.rb +++ b/lib/pghero/methods/replication.rb @@ -12,7 +12,7 @@ def replica? def replication_lag with_feature_support(:replication_lag) do lag_condition = - if server_version_num >= 100000 + if server_version_num >= PgConst::VERSION_10 "pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn()" else "pg_last_xlog_receive_location() = pg_last_xlog_replay_location()" @@ -30,7 +30,7 @@ def replication_lag end def replication_slots - if server_version_num >= 90400 + if server_version_num >= PgConst::VERSION_9_4 with_feature_support(:replication_slots, []) do select_all <<-SQL SELECT diff --git a/lib/pghero/methods/repository.rb b/lib/pghero/methods/repository.rb new file mode 100644 index 000000000..8accc93f9 --- /dev/null +++ b/lib/pghero/methods/repository.rb @@ -0,0 +1,79 @@ +# frozen_string_literal: true + +require 'active_record' + +module PgHero + module Methods + module Repository + + def insert(table, column_names, values_table, return_cols: nil) + quoted_column_list = quote_column_names(column_names) + + row_sql_list = values_table.map do |row_values| + "(#{quote_row_values(row_values).join(',')})" + end + + quoted_return_list = return_cols ? quote_column_names(return_cols) : nil + _insert(table, quoted_column_list, row_sql_list, quoted_return_list) + end + + def insert_typed(table, typed_columns, values_table, typed_return_cols: nil) + quoted_column_list = quote_typed_column_names(typed_columns) + + row_sql_list = values_table.map do |row_values| + "(#{quote_typed_row_values(typed_columns, row_values).join(',')})" + end + + quoted_return_list = typed_return_cols ? quote_typed_column_names(typed_return_cols) : nil + _insert(table, quoted_column_list, row_sql_list, quoted_return_list) + end + + private + + def _insert(table, quoted_column_list, rows_sql_list, quoted_return_list) + column_sql = quoted_column_list.join(',') + values_sql = rows_sql_list.join(',') + + insert_sql = <<-SQL + INSERT INTO #{quote_table_name(table)} + (#{column_sql}) + VALUES + #{values_sql} + SQL + + if quoted_return_list + insert_sql += <<-SQL if quoted_return_list + RETURNING + #{quoted_return_list.join(',')} + SQL + end + connection.execute(insert_sql) + end + + def quote_column_names(col_names) + connection = self.connection + col_names.map do |col_name| + connection.quote_table_name(col_name) + end + end + + def quote_row_values(row_values) + row_values.map do |value| + connection.quote(value) + end + end + + def quote_typed_column_names(typed_columns) + connection = self.connection + typed_columns.map { |col| col.quote_name(connection) } + end + + def quote_typed_row_values(typed_columns, row_values) + connection = self.connection + typed_columns.map.with_index do |typed_col, idx| + typed_col.quote_value(connection, row_values[idx]) + end + end + end + end +end diff --git a/lib/pghero/methods/settings.rb b/lib/pghero/methods/settings.rb index d1abfcc85..90180a510 100644 --- a/lib/pghero/methods/settings.rb +++ b/lib/pghero/methods/settings.rb @@ -3,7 +3,7 @@ module Methods module Settings def settings names = - if server_version_num >= 90500 + if server_version_num >= PgConst::VERSION_9_5 %i( max_connections shared_buffers effective_cache_size work_mem maintenance_work_mem min_wal_size max_wal_size checkpoint_completion_target diff --git a/lib/pghero/methods/space.rb b/lib/pghero/methods/space.rb index b701eed6f..7061a7a00 100644 --- a/lib/pghero/methods/space.rb +++ b/lib/pghero/methods/space.rb @@ -52,7 +52,7 @@ def space_growth(days: 7, relation_sizes: nil) sizes = Hash[ relation_sizes.map { |r| [[r[:schema], r[:relation]], r[:size_bytes]] } ] start_at = days.days.ago - stats = select_all_stats <<-SQL + stats = repository.select_all <<-SQL WITH t AS ( SELECT schema, @@ -95,7 +95,7 @@ def relation_space_stats(relation, schema: "public") sizes = Hash[ relation_sizes.map { |r| [[r[:schema], r[:relation]], r[:size_bytes]] } ] start_at = 30.days.ago - stats = select_all_stats <<-SQL + stats = repository.select_all <<-SQL SELECT captured_at, size AS size_bytes @@ -126,11 +126,11 @@ def capture_space_stats relation_sizes.each do |rs| values << [id, rs[:schema], rs[:relation], rs[:size_bytes].to_i, now] end - insert_stats("pghero_space_stats", columns, values) if values.any? + repository.insert("pghero_space_stats", columns, values) if values.any? end def space_stats_enabled? - table_exists?("pghero_space_stats") + repository.table_exists?("pghero_space_stats") end end end diff --git a/lib/pghero/methods/type_const.rb b/lib/pghero/methods/type_const.rb new file mode 100644 index 000000000..602c6a2ef --- /dev/null +++ b/lib/pghero/methods/type_const.rb @@ -0,0 +1,66 @@ +# frozen_string_literal: true + +require 'active_record' +require 'active_record/connection_adapters/postgresql_adapter' + +module PgHero + module Methods + module TypeConst + + def self.lookup_type(*args, **kwargs) + ActiveRecord::Type.lookup(*args, adapter: :postgresql, **kwargs) + end + + BIGINT = lookup_type(:integer, limit: 8) + BIGINT_ARRAY = lookup_type(:integer, limit: 8, array: true) # PG specific + BOOLEAN = lookup_type(:boolean) + DATE = lookup_type(:date) + DATETIME = lookup_type(:datetime) + DECIMAL = lookup_type(:decimal) + FLOAT = lookup_type(:float) + INET = lookup_type(:inet) # PG specific + INTEGER = lookup_type(:integer) + INTEGER_ARRAY = lookup_type(:integer, array: true) # PG specific + STRING = lookup_type(:string) + TEXT = lookup_type(:text) + TIME = lookup_type(:time) + + # XID is a 32-bit unsigned value that eventually wraps; + # store in BIGINT to avoid negatives. Not the same as a + # BIGINT txid value that is actually 64-bits with an + # epoch to avoid wraparound + XID = BIGINT + + def self.define_column(name, type) + TypedColumn.new(name, type) + end + + def self.const_column_list(**kwargs) + kwargs.each_pair.map do |col_name, col_type| + define_column(col_name, col_type) + end.freeze + end + + class TypedColumn + attr_reader :name + + def initialize(name, type) + @name = name.to_sym + @type = type + freeze + end + + def quote_name(connection) + connection.quote_table_name(@name) + end + + def quote_value(connection, value) + connection.quote(@type.serialize(value)) + end + end + + private_class_method :lookup_type + end + end +end + diff --git a/lib/pghero/pg_const.rb b/lib/pghero/pg_const.rb new file mode 100644 index 000000000..869e63e54 --- /dev/null +++ b/lib/pghero/pg_const.rb @@ -0,0 +1,8 @@ +module PgHero + module PgConst + VERSION_9_4 = 9_04_00 + VERSION_9_5 = 9_05_00 + VERSION_9_6 = 9_06_00 + VERSION_10 = 10_00_00 + end +end \ No newline at end of file diff --git a/lib/pghero/query_stats.rb b/lib/pghero/query_stats.rb deleted file mode 100644 index b79e1a098..000000000 --- a/lib/pghero/query_stats.rb +++ /dev/null @@ -1,7 +0,0 @@ -module PgHero - class QueryStats < ActiveRecord::Base - self.abstract_class = true - self.table_name = "pghero_query_stats" - establish_connection ENV["PGHERO_STATS_DATABASE_URL"] if ENV["PGHERO_STATS_DATABASE_URL"] - end -end diff --git a/lib/pghero/repository.rb b/lib/pghero/repository.rb new file mode 100644 index 000000000..ffb7738f2 --- /dev/null +++ b/lib/pghero/repository.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +require 'active_record' + +module PgHero + class Repository < BaseDatabase + # Repository extends BaseDatabase and not Database (aka MonitoredDatabase) + # because we keep separate connections for monitoring vs repository access + # even when monitoring the repository itself. This keeps the logic, + # transaction isolation, etc completely segregated. + + include Methods::Repository + include Methods::QueryBlockersHistory + + private + + def connection_model + QueryStats + end + + class QueryStats < ActiveRecord::Base + self.abstract_class = true + self.table_name = 'pghero_query_stats' + establish_connection ENV['PGHERO_STATS_DATABASE_URL'] if ENV['PGHERO_STATS_DATABASE_URL'] + end + end +end diff --git a/lib/pghero/version.rb b/lib/pghero/version.rb index b0a5ce0e8..0e16cff33 100644 --- a/lib/pghero/version.rb +++ b/lib/pghero/version.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + module PgHero VERSION = "2.3.0" end diff --git a/lib/tasks/pghero.rake b/lib/tasks/pghero.rake index 6e1614fb3..ed0628249 100644 --- a/lib/tasks/pghero.rake +++ b/lib/tasks/pghero.rake @@ -1,15 +1,17 @@ +# frozen_string_literal: true + namespace :pghero do - desc "capture query stats" + desc 'capture query stats' task capture_query_stats: :environment do PgHero.capture_query_stats(verbose: true) end - desc "capture space stats" + desc 'capture space stats' task capture_space_stats: :environment do PgHero.capture_space_stats(verbose: true) end - desc "capture connection stats" + desc 'capture connection stats' task capture_connection_stats: :environment do PgHero.capture_connection_stats(verbose: true) end @@ -21,10 +23,10 @@ namespace :pghero do desc "analyze tables" task analyze: :environment do - PgHero.analyze_all(verbose: true, min_size: ENV["MIN_SIZE_GB"].to_f.gigabytes) + PgHero.analyze_all(verbose: true, min_size: ENV['MIN_SIZE_GB'].to_f.gigabytes) end - desc "autoindex" + desc 'autoindex' task autoindex: :environment do PgHero.autoindex_all(verbose: true, create: true) end diff --git a/test/basic_test.rb b/test/basic_test.rb index b9e5e4120..6bb3158fa 100644 --- a/test/basic_test.rb +++ b/test/basic_test.rb @@ -1,4 +1,4 @@ -require_relative "test_helper" +require_relative 'test_helper' class BasicTest < Minitest::Test def setup diff --git a/test/capture_test.rb b/test/capture_test.rb index 74cc06f2a..aec5e9c4f 100644 --- a/test/capture_test.rb +++ b/test/capture_test.rb @@ -3,11 +3,33 @@ require_relative 'test_helper' class CaptureTest < Minitest::Test + def test_primary_database_capture_query_stats + stats_repository.with_transaction(rollback: rollback_enabled?) do + assert primary_database.capture_query_stats(raise_errors: true) + end + end + + def test_capture_query_stats + stats_repository.with_transaction(rollback: rollback_enabled?) do + assert PgHero.capture_query_stats(verbose: true) + end + end - def test_capture_query_blockers - # TODO: test needs to be in a transaction rollback as soon - # as capture_query_blockers implements real inserts - assert PgHero.capture_query_blockers(verbose: true) + def test_capture_space_stats + stats_repository.with_transaction(rollback: rollback_enabled?) do + assert PgHero.capture_space_stats(verbose: true) + end end + def test_capture_query_stats + stats_repository.with_transaction(rollback: rollback_enabled?) do + assert PgHero.capture_query_stats(verbose: true) + end + end + + def test_capture_connection_stats + stats_repository.with_transaction(rollback: rollback_enabled?) do + assert PgHero.capture_connection_stats(verbose: true) + end + end end diff --git a/test/query_blockers_test.rb b/test/query_blockers_test.rb new file mode 100644 index 000000000..679c7a3f7 --- /dev/null +++ b/test/query_blockers_test.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true + +require_relative 'test_helper' + +class QueryBlockersTest < Minitest::Test + + def test_primary_database_current_blockers + assert primary_database.sample_query_blockers + end + + def test_primary_database_capture_blockers_return + run_with_blockers(rollback: rollback_enabled?) do + blocker_sample = primary_database.capture_query_blockers(raise_errors: true) + assert blocker_sample.sessions.size == 2 + end + end + + def test_capture_blockers + run_with_blockers(rollback: rollback_enabled?) do + assert PgHero.capture_query_blockers(verbose: true) + end + end + + private + + def run_with_blockers(rollback: true) + # TODO: ideally we would use a third thread to separate the + # transaction/connection used for storing historical stat + User.transaction do + locked_user = User.all.first + locked_user.active = !locked_user.active + locked_user.save + + t = lock_user_in_separate_thread(locked_user.id) + # Give thread one second to block on the DB lock; + # test could fail if DB is too slow - increase block time if needed + t.join(1) + yield + raise ActiveRecord::Rollback, 'Test - do not save anything' if rollback + end + end + + def lock_user_in_separate_thread(user_id) + Thread.new do + # No full Rails dependency so can't use Rails.application.executor.wrap; use older with_connection mechanism + ActiveRecord::Base.connection_pool.with_connection do + # Will block on the main thread holding the row lock until the main transaction completes + User.transaction do + User.find(user_id).lock! + end + end + end + end +end diff --git a/test/test_helper.rb b/test/test_helper.rb index 0b667e08e..ffb43a595 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -66,3 +66,16 @@ class User < ActiveRecord::Base end User.import users, validate: false ActiveRecord::Base.connection.execute("ANALYZE users") + +def primary_database + PgHero.databases[:primary] +end + +def stats_repository + primary_database.send :repository +end + +def rollback_enabled? + false # TODO: JRD - turn this back off +end +