Skip to content

Commit

Permalink
Merge pull request #12 from coroot/query_size_limit
Browse files Browse the repository at this point in the history
Limiting the size of queries
  • Loading branch information
apetruhin authored Dec 11, 2022
2 parents baad2c3 + 24ff87a commit e08ac37
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 12 deletions.
25 changes: 22 additions & 3 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import (
"time"
)

const topQueriesN = 20
const (
topQueriesN = 20
hardQuerySizeLimit = 4096
)

var (
dUp = desc("pg_up", "Is the server reachable")
Expand Down Expand Up @@ -134,14 +137,30 @@ func (c *Collector) snapshot() {
c.logger.Warning(err)
}

querySizeLimit := 0
for _, s := range c.settings {
if s.Name == "track_activity_query_size" {
switch s.Unit {
case "B":
querySizeLimit = int(s.Value)
case "kB":
querySizeLimit = int(s.Value) * 1024
}
break
}
}
if querySizeLimit == 0 || querySizeLimit > hardQuerySizeLimit {
querySizeLimit = hardQuerySizeLimit
}

c.ssPrev = c.ssCurr
c.saPrev = c.saCurr
c.ssCurr, err = c.getStatStatements(version)
c.ssCurr, err = c.getStatStatements(version, querySizeLimit)
if err != nil {
c.logger.Warning(err)
return
}
c.saCurr, err = c.getPgStatActivity(version)
c.saCurr, err = c.getPgStatActivity(version, querySizeLimit)
if err != nil {
c.logger.Warning(err)
return
Expand Down
10 changes: 5 additions & 5 deletions collector/pg_stat_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,21 @@ type saSnapshot struct {
connections map[int]Connection
}

func (c *Collector) getPgStatActivity(version semver.Version) (*saSnapshot, error) {
func (c *Collector) getPgStatActivity(version semver.Version, querySizeLimit int) (*saSnapshot, error) {
snapshot := &saSnapshot{connections: map[int]Connection{}}
var query string
switch {
case semver.MustParseRange(">=9.3.0 <9.6.0")(version):
query += "SELECT s.pid, s.datname, s.usename, s.query, s.state, now(), s.query_start, s.waiting, null, null, null"
query = "SELECT s.pid, s.datname, s.usename, LEFT(s.query, %d), s.state, now(), s.query_start, s.waiting, null, null, null"
case semver.MustParseRange(">=9.6.0 <10.0.0")(version):
query += "SELECT s.pid, s.datname, s.usename, s.query, s.state, now(), s.query_start, null, s.wait_event_type, null, (pg_blocking_pids(s.pid))[1]"
query = "SELECT s.pid, s.datname, s.usename, LEFT(s.query, %d), s.state, now(), s.query_start, null, s.wait_event_type, null, (pg_blocking_pids(s.pid))[1]"
case semver.MustParseRange(">=10.0.0")(version):
query += "SELECT s.pid, s.datname, s.usename, s.query, s.state, now(), s.query_start, null, s.wait_event_type, s.backend_type, (pg_blocking_pids(s.pid))[1]"
query = "SELECT s.pid, s.datname, s.usename, LEFT(s.query, %d), s.state, now(), s.query_start, null, s.wait_event_type, s.backend_type, (pg_blocking_pids(s.pid))[1]"
default:
return nil, fmt.Errorf("postgres version %s is not supported", version)
}
query += " FROM pg_stat_activity s JOIN pg_database d ON s.datid = d.oid AND NOT d.datistemplate"
rows, err := c.db.Query(query)
rows, err := c.db.Query(fmt.Sprintf(query, querySizeLimit))
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions collector/pg_stat_statements.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ type ssSnapshot struct {
rows map[statementId]ssRow
}

func (c *Collector) getStatStatements(version semver.Version) (*ssSnapshot, error) {
func (c *Collector) getStatStatements(version semver.Version, querySizeLimit int) (*ssSnapshot, error) {
snapshot := &ssSnapshot{ts: time.Now(), rows: map[statementId]ssRow{}}
var query string
switch {
case semver.MustParseRange(">=9.4.0 <13.0.0")(version):
query = `SELECT d.datname, r.rolname, s.query, s.queryid, s.calls, s.total_time, s.blk_read_time + s.blk_write_time`
query = `SELECT d.datname, r.rolname, LEFT(s.query, %d), s.queryid, s.calls, s.total_time, s.blk_read_time + s.blk_write_time`
case semver.MustParseRange(">=13.0.0")(version):
query = `SELECT d.datname, r.rolname, s.query, s.queryid, s.calls, s.total_plan_time + s.total_exec_time, s.blk_read_time + s.blk_write_time`
query = `SELECT d.datname, r.rolname, LEFT(s.query, %d), s.queryid, s.calls, s.total_plan_time + s.total_exec_time, s.blk_read_time + s.blk_write_time`
default:
return nil, fmt.Errorf("postgres version %s is not supported", version)
}
query += ` FROM pg_stat_statements s JOIN pg_roles r ON r.oid=s.userid JOIN pg_database d ON d.oid=s.dbid AND NOT d.datistemplate`
rows, err := c.db.Query(query)
rows, err := c.db.Query(fmt.Sprintf(query, querySizeLimit))
if err != nil {
return nil, err
}
Expand Down

0 comments on commit e08ac37

Please sign in to comment.