Skip to content

Commit

Permalink
Replication metrics (#6)
Browse files Browse the repository at this point in the history
* added replication metrics: `pg_wal_receiver_status`, `pg_wal_current_lsn`, `pg_wal_receive_lsn`, `pg_wal_reply_lsn`, `pg_wal_replay_paused`
  • Loading branch information
apetruhin authored Jul 29, 2022
1 parent 879c41c commit e9c726c
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 3 deletions.
48 changes: 45 additions & 3 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ var (
dTopQueryIOTime = desc("pg_top_query_io_time_per_second", "Time the query spent awaiting IO", "db", "user", "query")

dLockAwaitingQueries = desc("pg_lock_awaiting_queries", "Number of queries awaiting a lock", "db", "user", "blocking_query")

dWalReceiverStatus = desc("pg_wal_receiver_status", "WAL receiver status: 1 if the receiver is connected, otherwise 0", "sender_host", "sender_port")
dWalReplayPaused = desc("pg_wal_replay_paused", "Whether WAL replay paused or not")
dWalCurrentLsn = desc("pg_wal_current_lsn", "Current WAL sequence number")
dWalReceiveLsn = desc("pg_wal_receive_lsn", "WAL sequence number that has been received and synced to disk by streaming replication")
dWalReplyLsn = desc("pg_wal_reply_lsn", "WAL sequence number that has been replayed during recovery")
)

type QueryKey struct {
Expand Down Expand Up @@ -68,6 +74,7 @@ type Collector struct {
saCurr *saSnapshot
saPrev *saSnapshot
settings []Setting
replicationStatus *replicationStatus

lock sync.RWMutex
logger logger.Logger
Expand Down Expand Up @@ -118,6 +125,15 @@ func (c *Collector) snapshot() {
c.logger.Warning(err)
return
}

if c.settings, err = c.getSettings(); err != nil {
c.logger.Warning(err)
}

if c.replicationStatus, err = c.getReplicationStatus(version); err != nil {
c.logger.Warning(err)
}

c.ssPrev = c.ssCurr
c.saPrev = c.saCurr
c.ssCurr, err = c.getStatStatements(version)
Expand All @@ -130,9 +146,6 @@ func (c *Collector) snapshot() {
c.logger.Warning(err)
return
}
if c.settings, err = c.getSettings(); err != nil {
c.logger.Warning(err)
}
}

func (c *Collector) summaries() (map[QueryKey]*QuerySummary, time.Duration) {
Expand Down Expand Up @@ -270,6 +283,26 @@ func (c *Collector) Collect(ch chan<- prometheus.Metric) {
for _, s := range c.settings {
ch <- gauge(dSettings, s.Value, s.Name, s.Unit)
}

if c.replicationStatus != nil {
rs := c.replicationStatus
if rs.isInRecovery {
ch <- counter(dWalReceiveLsn, float64(rs.receiveLsn))
ch <- counter(dWalReplyLsn, float64(rs.replyLsn))
isReplayPaused := 0.0
if rs.isReplayPaused {
isReplayPaused = 1.0
}
ch <- gauge(dWalReplayPaused, isReplayPaused)
host, port, err := rs.primaryHostPort()
if err != nil {
c.logger.Warning(err)
}
ch <- gauge(dWalReceiverStatus, float64(rs.walReceiverStatus), host, port)
} else {
ch <- counter(dWalCurrentLsn, float64(rs.currentLsn))
}
}
}

func (c *Collector) Describe(ch chan<- *prometheus.Desc) {
Expand All @@ -284,6 +317,11 @@ func (c *Collector) Describe(ch chan<- *prometheus.Desc) {
ch <- dTopQueryTime
ch <- dTopQueryIOTime
ch <- dDbQueries
ch <- dWalReceiverStatus
ch <- dWalReplayPaused
ch <- dWalCurrentLsn
ch <- dWalReceiveLsn
ch <- dWalReplyLsn
}

func desc(name, help string, labels ...string) *prometheus.Desc {
Expand All @@ -293,3 +331,7 @@ func desc(name, help string, labels ...string) *prometheus.Desc {
func gauge(desc *prometheus.Desc, value float64, labels ...string) prometheus.Metric {
return prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, value, labels...)
}

func counter(desc *prometheus.Desc, value float64, labels ...string) prometheus.Metric {
return prometheus.MustNewConstMetric(desc, prometheus.CounterValue, value, labels...)
}
107 changes: 107 additions & 0 deletions collector/pg_replication.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package collector

import (
"database/sql"
"errors"
"fmt"
"github.com/blang/semver"
"net/url"
"regexp"
"strings"
)

var (
valueRe = `\s*=[\s']*([^\s']+)`
hostRe = regexp.MustCompile("host" + valueRe)
hostaddrRe = regexp.MustCompile("hostaddr" + valueRe)
portRe = regexp.MustCompile("port" + valueRe)
)

func findValue(src string, re *regexp.Regexp) string {
res := re.FindStringSubmatch(src)
if len(res) < 2 {
return ""
}
return res[1]
}

type replicationStatus struct {
isInRecovery bool

currentLsn int64
receiveLsn int64
replyLsn int64

isReplayPaused bool

walReceiverStatus int64
primaryConnectionInfo string
}

func (rs *replicationStatus) primaryHostPort() (string, string, error) {
ci := rs.primaryConnectionInfo
if strings.HasPrefix(ci, "postgres://") || strings.HasPrefix(ci, "postgresql://") {
u, err := url.Parse(ci)
if err != nil {
// don't log url.Parse errors since they might contain security sensitive data
return "", "", fmt.Errorf("failed to parse primary_conninfo")
}
return u.Hostname(), u.Port(), nil
}
host := findValue(ci, hostRe)
if host == "" {
host = findValue(ci, hostaddrRe)
}
port := findValue(ci, portRe)
return host, port, nil
}

func (c *Collector) getReplicationStatus(version semver.Version) (*replicationStatus, error) {
var isInRecovery sql.NullBool
if err := c.db.QueryRow(`SELECT pg_is_in_recovery()`).Scan(&isInRecovery); err != nil {
return nil, err
}

if !isInRecovery.Valid {
return nil, fmt.Errorf("pg_is_in_recovery() returned null")
}

var fCurrentLsn, fReceiveLsn, fReplyLsn, fIsReplayPaused string
switch {
// the `pg_stat_wal_receiver` view has been introduced in 9.6
case semver.MustParseRange(">=9.6.0 <10.0.0")(version):
fCurrentLsn = "pg_current_xlog_location"
fReceiveLsn = "pg_last_xlog_receive_location"
fReplyLsn = "pg_last_xlog_replay_location"
fIsReplayPaused = "pg_is_xlog_replay_paused"
case semver.MustParseRange(">=10.0.0")(version):
fCurrentLsn = "pg_current_wal_lsn"
fReceiveLsn = "pg_last_wal_receive_lsn"
fReplyLsn = "pg_last_wal_replay_lsn"
fIsReplayPaused = "pg_is_wal_replay_paused"
default:
return nil, fmt.Errorf("postgres version %s is not supported", version)
}

rs := &replicationStatus{isInRecovery: isInRecovery.Bool}
if rs.isInRecovery {
if err := c.db.QueryRow(fmt.Sprintf(
`SELECT %s()-'0/0', %s()-'0/0', %s()`, fReceiveLsn, fReplyLsn, fIsReplayPaused)).Scan(
&rs.receiveLsn, &rs.replyLsn, &rs.isReplayPaused); err != nil {
return nil, err
}
if err := c.db.QueryRow(`SELECT count(1) FROM pg_stat_wal_receiver`).Scan(&rs.walReceiverStatus); err != nil {
return nil, err
}
if err := c.db.QueryRow(`SELECT setting FROM pg_settings WHERE name='primary_conninfo'`).Scan(&rs.primaryConnectionInfo); err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return nil, err
}
}
} else {
if err := c.db.QueryRow(fmt.Sprintf(`SELECT %s()-'0/0'`, fCurrentLsn)).Scan(&rs.currentLsn); err != nil {
return nil, err
}
}
return rs, nil
}
31 changes: 31 additions & 0 deletions collector/pg_replication_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package collector

import (
"github.com/stretchr/testify/assert"
"testing"
)

func Test_parsePrimaryConnectionInfo(t *testing.T) {
check := func(src string, host, port string) {
rs := replicationStatus{primaryConnectionInfo: src}
h, p, err := rs.primaryHostPort()
assert.NoError(t, err)
assert.Equal(t, host, h)
assert.Equal(t, port, p)
}

check("host=127.0.0.1 port=5432", "127.0.0.1", "5432")
check("host=127.0.0.1", "127.0.0.1", "")

check("host = 127.0.0.1 port = 5432", "127.0.0.1", "5432")
check("host = '127.0.0.1' port = 5432", "127.0.0.1", "5432")
check("host = ' 127.0.0.1 ' port = 5432", "127.0.0.1", "5432")

check("hostaddr=127.0.0.1 port=5432", "127.0.0.1", "5432")

check("postgresql://localhost:5433", "localhost", "5433")
check("postgres://localhost:5433", "localhost", "5433")
check("postgresql://user:secret@localhost", "localhost", "")
check("postgresql://other@localhost/otherdb?connect_timeout=10&application_name=myapp", "localhost", "")
check("postgresql://[2001:db8::1234]/database", "2001:db8::1234", "")
}

0 comments on commit e9c726c

Please sign in to comment.