Skip to content
This repository has been archived by the owner on Aug 24, 2022. It is now read-only.

PMM-9632 Settings view usage. #374

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
211 changes: 112 additions & 99 deletions agents/postgres/pgstatmonitor/pgstatmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import (

"github.com/AlekSi/pointer"
ver "github.com/hashicorp/go-version"
"github.com/lib/pq" //nolint:gci
_ "github.com/lib/pq" // register SQL driver.
"github.com/lib/pq"
"github.com/percona/pmm/api/agentpb"
"github.com/percona/pmm/api/inventorypb"
"github.com/percona/pmm/utils/sqlmetrics"
Expand All @@ -45,20 +44,12 @@ const defaultWaitTime = 60 * time.Second

// PGStatMonitorQAN QAN services connects to PostgreSQL and extracts stats.
type PGStatMonitorQAN struct {
q *reform.Querier
dbCloser io.Closer
agentID string
l *logrus.Entry
changes chan agents.Change
monitorCache *statMonitorCache

// By default, query shows the actual parameter instead of the placeholder.
BupycHuk marked this conversation as resolved.
Show resolved Hide resolved
// It is quite useful when users want to use that query and try to run that
// query to check the abnormalities. But in most cases users like the queries
// with a placeholder. This parameter is used to toggle between the two said
// options.
pgsmNormalizedQuery bool
waitTime time.Duration
q *reform.Querier
dbCloser io.Closer
agentID string
l *logrus.Entry
changes chan agents.Change
monitorCache *statMonitorCache
disableQueryExamples bool
}

Expand Down Expand Up @@ -92,7 +83,7 @@ const (
commandTypeUpdate = "UPDATE"
commandTypeInsert = "INSERT"
commandTypeDelete = "DELETE"
commandTypeUtiity = "UTILITY"
commandTypeUtility = "UTILITY"
Copy link
Contributor Author

@JiriCtvrtka JiriCtvrtka May 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already fixed in previous PR, but its not merged yet.

)

var commandTypeToText = []string{
Expand All @@ -101,7 +92,7 @@ var commandTypeToText = []string{
commandTypeUpdate,
commandTypeInsert,
commandTypeDelete,
commandTypeUtiity,
commandTypeUtility,
commandTextNotAvailable,
}

Expand All @@ -122,25 +113,6 @@ func New(params *Params, l *logrus.Entry) (*PGStatMonitorQAN, error) {
return newPgStatMonitorQAN(q, sqlDB, params.AgentID, params.DisableQueryExamples, l)
}

func isPropertyValueInt(property string) bool {
switch property {
case
"pg_stat_monitor.pgsm_histogram_max",
"pg_stat_monitor.pgsm_query_max_len",
"pg_stat_monitor.pgsm_max",
"pg_stat_monitor.pgsm_bucket_time",
"pg_stat_monitor.pgsm_query_shared_buffer",
"pg_stat_monitor.pgsm_max_buckets",
"pg_stat_monitor.pgsm_histogram_buckets",
"pg_stat_monitor.pgsm_overflow_target",
"pg_stat_monitor.pgsm_histogram_min":

return true
}

return false
}

func areSettingsTextValues(q *reform.Querier) (bool, error) {
pgsmVersion, prerelease, err := getPGMonitorVersion(q)
if err != nil {
Expand All @@ -155,66 +127,13 @@ func areSettingsTextValues(q *reform.Querier) (bool, error) {
}

func newPgStatMonitorQAN(q *reform.Querier, dbCloser io.Closer, agentID string, disableQueryExamples bool, l *logrus.Entry) (*PGStatMonitorQAN, error) {
var settings []reform.Struct

settingsValuesAreText, err := areSettingsTextValues(q)
if err != nil {
return nil, err
}
if settingsValuesAreText {
settings, err = q.SelectAllFrom(pgStatMonitorSettingsTextValueView, "")
} else {
settings, err = q.SelectAllFrom(pgStatMonitorSettingsView, "")
}
if err != nil {
return nil, errors.Wrap(err, "failed to get settings")
}

var normalizedQuery bool
waitTime := defaultWaitTime
for _, row := range settings {
var name string
var value int64

if settingsValuesAreText {
setting := row.(*pgStatMonitorSettingsTextValue)
name = setting.Name
if !isPropertyValueInt(name) {
continue
}

valueInt, err := strconv.ParseInt(setting.Value, 10, 64)
if err != nil {
return nil, errors.Wrap(err, "value cannot be parsed as integer")
}
value = valueInt
} else {
setting := row.(*pgStatMonitorSettings)
name = setting.Name
value = setting.Value
}

if err == nil {
switch name {
case "pg_stat_monitor.pgsm_normalized_query":
normalizedQuery = value == 1
case "pg_stat_monitor.pgsm_bucket_time":
if value < int64(defaultWaitTime.Seconds()) {
waitTime = time.Duration(value) * time.Second
}
}
}
}

return &PGStatMonitorQAN{
q: q,
dbCloser: dbCloser,
agentID: agentID,
l: l,
changes: make(chan agents.Change, 10),
monitorCache: newStatMonitorCache(l),
pgsmNormalizedQuery: normalizedQuery,
waitTime: waitTime,
disableQueryExamples: disableQueryExamples,
}, nil
}
Expand Down Expand Up @@ -276,10 +195,21 @@ func (m *PGStatMonitorQAN) Run(ctx context.Context) {
close(m.changes)
}()

settings, err := m.getSettings()
if err != nil {
m.l.Error(err)
m.changes <- agents.Change{Status: inventorypb.AgentStatus_WAITING}
}
normalizedQuery, err := settings.getNormalizedQueryValue()
if err != nil {
m.l.Error(err)
m.changes <- agents.Change{Status: inventorypb.AgentStatus_WAITING}
}

// add current stat monitor to cache so they are not send as new on first iteration with incorrect timestamps
var running bool
m.changes <- agents.Change{Status: inventorypb.AgentStatus_STARTING}
if current, _, err := m.monitorCache.getStatMonitorExtended(ctx, m.q, m.pgsmNormalizedQuery); err == nil {
if current, _, err := m.monitorCache.getStatMonitorExtended(ctx, m.q, normalizedQuery); err == nil {
m.monitorCache.refresh(current)
m.l.Debugf("Got %d initial stat monitor.", len(current))
running = true
Expand All @@ -289,10 +219,15 @@ func (m *PGStatMonitorQAN) Run(ctx context.Context) {
m.changes <- agents.Change{Status: inventorypb.AgentStatus_WAITING}
}

waitTime, err := settings.getWaitTime()
if err != nil {
m.l.Warning(err)
}

// query pg_stat_monitor every waitTime seconds
start := time.Now()
m.l.Debugf("Scheduling next collection in %s at %s.", m.waitTime, start.Add(m.waitTime).Format("15:04:05"))
t := time.NewTimer(m.waitTime)
m.l.Debugf("Scheduling next collection in %s at %s.", waitTime, start.Add(waitTime).Format("15:04:05"))
t := time.NewTimer(waitTime)
defer t.Stop()

for {
Expand All @@ -307,12 +242,27 @@ func (m *PGStatMonitorQAN) Run(ctx context.Context) {
m.changes <- agents.Change{Status: inventorypb.AgentStatus_STARTING}
}

lengthS := uint32(m.waitTime.Seconds())
buckets, err := m.getNewBuckets(ctx, lengthS)
settings, err := m.getSettings()
if err != nil {
m.l.Errorf(err.Error())
running = false
m.changes <- agents.Change{Status: inventorypb.AgentStatus_WAITING}
continue
}
normalizedQuery, err := settings.getNormalizedQueryValue()
if err != nil {
m.l.Errorf(err.Error())
running = false
m.changes <- agents.Change{Status: inventorypb.AgentStatus_WAITING}
continue
}

lengthS := uint32(waitTime.Seconds())
buckets, err := m.getNewBuckets(ctx, lengthS, normalizedQuery)

start = time.Now()
m.l.Debugf("Scheduling next collection in %s at %s.", m.waitTime, start.Add(m.waitTime).Format("15:04:05"))
t.Reset(m.waitTime)
m.l.Debugf("Scheduling next collection in %s at %s.", waitTime, start.Add(waitTime).Format("15:04:05"))
t.Reset(waitTime)

if err != nil {
m.l.Error(errors.Wrap(err, "getNewBuckets failed"))
Expand All @@ -331,8 +281,71 @@ func (m *PGStatMonitorQAN) Run(ctx context.Context) {
}
}

func (m *PGStatMonitorQAN) getNewBuckets(ctx context.Context, periodLengthSecs uint32) ([]*agentpb.MetricsBucket, error) {
current, prev, err := m.monitorCache.getStatMonitorExtended(ctx, m.q, m.pgsmNormalizedQuery)
type settings map[string]*pgStatMonitorSettingsTextValue

func (m *PGStatMonitorQAN) getSettings() (settings, error) {
var settingsRows []reform.Struct

settingsValuesAreText, err := areSettingsTextValues(m.q)
if err != nil {
return nil, err
}
if settingsValuesAreText {
settingsRows, err = m.q.SelectAllFrom(pgStatMonitorSettingsTextValueView, "")
} else {
settingsRows, err = m.q.SelectAllFrom(pgStatMonitorSettingsView, "")
}
if err != nil {
return nil, errors.Wrap(err, "failed to get settings")
}

settings := make(settings)
for _, row := range settingsRows {
if settingsValuesAreText {
setting := row.(*pgStatMonitorSettingsTextValue)
settings[setting.Name] = setting
} else {
setting := row.(*pgStatMonitorSettings)
name := setting.Name
settings[name] = &pgStatMonitorSettingsTextValue{
Name: name,
Value: fmt.Sprintf("%d", setting.Value),
}
}
}

return settings, nil
}

func (s settings) getNormalizedQueryValue() (bool, error) {
key := "pg_stat_monitor.pgsm_normalized_query"
if _, ok := s[key]; !ok {
return false, errors.New("failed to get pgsm_normalized_query property")
}

if s[key].Value == "yes" || s[key].Value == "1" {
return true, nil
}

return false, nil
}

func (s settings) getWaitTime() (time.Duration, error) {
key := "pg_stat_monitor.pgsm_bucket_time"
if _, ok := s[key]; !ok {
return defaultWaitTime, errors.New("failed to get pgsm_bucket_time, wait time set on 60 seconds")
}

valueInt, err := strconv.ParseInt(s[key].Value, 10, 64)
if err != nil {
return defaultWaitTime, errors.Wrap(err, "property pgsm_bucket_time cannot be parsed as integer, wait time set on 60 seconds")
}

return time.Duration(valueInt) * time.Second, nil
}

func (m *PGStatMonitorQAN) getNewBuckets(ctx context.Context, periodLengthSecs uint32, normalizedQuery bool) ([]*agentpb.MetricsBucket, error) {
current, prev, err := m.monitorCache.getStatMonitorExtended(ctx, m.q, normalizedQuery)
if err != nil {
return nil, err
}
Expand Down
28 changes: 22 additions & 6 deletions agents/postgres/pgstatmonitor/pgstatmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,12 @@ func TestPGStatMonitorSchema(t *testing.T) {
_, err := db.Exec(selectAllCountries)
require.NoError(t, err)

buckets, err := m.getNewBuckets(context.Background(), 60)
settings, err := m.getSettings()
require.NoError(t, err)
normalizedQuery, err := settings.getNormalizedQueryValue()
require.NoError(t, err)

buckets, err := m.getNewBuckets(context.Background(), 60, normalizedQuery)
require.NoError(t, err)
buckets = filter(buckets)
t.Logf("Actual:\n%s", tests.FormatBuckets(buckets))
Expand All @@ -213,7 +218,8 @@ func TestPGStatMonitorSchema(t *testing.T) {
assert.Equal(t, float32(5), actual.Postgresql.MSharedBlksHitSum+actual.Postgresql.MSharedBlksReadSum)
assert.InDelta(t, 1.5, actual.Postgresql.MSharedBlksHitCnt+actual.Postgresql.MSharedBlksReadCnt, 0.5)
example := ""
if !m.pgsmNormalizedQuery && !m.disableQueryExamples {

if !normalizedQuery && !m.disableQueryExamples {
example = actual.Common.Example
}

Expand Down Expand Up @@ -264,7 +270,7 @@ func TestPGStatMonitorSchema(t *testing.T) {
_, err = db.Exec(selectAllCountries)
require.NoError(t, err)

buckets, err = m.getNewBuckets(context.Background(), 60)
buckets, err = m.getNewBuckets(context.Background(), 60, normalizedQuery)
require.NoError(t, err)
buckets = filter(buckets)
t.Logf("Actual:\n%s", tests.FormatBuckets(buckets))
Expand Down Expand Up @@ -327,7 +333,12 @@ func TestPGStatMonitorSchema(t *testing.T) {
_, err := db.Exec(q, args...)
require.NoError(t, err)

buckets, err := m.getNewBuckets(context.Background(), 60)
settings, err := m.getSettings()
require.NoError(t, err)
normalizedQuery, err := settings.getNormalizedQueryValue()
require.NoError(t, err)

buckets, err := m.getNewBuckets(context.Background(), 60, normalizedQuery)
require.NoError(t, err)
buckets = filter(buckets)
t.Logf("Actual:\n%s", tests.FormatBuckets(buckets))
Expand Down Expand Up @@ -385,7 +396,7 @@ func TestPGStatMonitorSchema(t *testing.T) {
_, err = db.Exec(q, args...)
require.NoError(t, err)

buckets, err = m.getNewBuckets(context.Background(), 60)
buckets, err = m.getNewBuckets(context.Background(), 60, normalizedQuery)
require.NoError(t, err)
buckets = filter(buckets)
t.Logf("Actual:\n%s", tests.FormatBuckets(buckets))
Expand Down Expand Up @@ -469,9 +480,14 @@ func TestPGStatMonitorSchema(t *testing.T) {
}
waitGroup.Wait()

settings, err := m.getSettings()
require.NoError(t, err)
normalizedQuery, err := settings.getNormalizedQueryValue()
require.NoError(t, err)

var buckets []*agentpb.MetricsBucket
for i := 0; i < 100; i++ {
buckets, err = m.getNewBuckets(context.Background(), 60)
buckets, err = m.getNewBuckets(context.Background(), 60, normalizedQuery)
require.NoError(t, err)
buckets = filter(buckets)
t.Logf("Actual:\n%s", tests.FormatBuckets(buckets))
Expand Down
7 changes: 6 additions & 1 deletion agents/postgres/pgstatmonitor/stat_monitor_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ func TestPGStatMonitorStructs(t *testing.T) {
}()

m := setup(t, db, false)
current, cache, err := m.monitorCache.getStatMonitorExtended(context.TODO(), db.Querier, m.pgsmNormalizedQuery)
settings, err := m.getSettings()
assert.NoError(t, err)
normalizedQuery, err := settings.getNormalizedQueryValue()
assert.NoError(t, err)

current, cache, err := m.monitorCache.getStatMonitorExtended(context.TODO(), db.Querier, normalizedQuery)

require.NoError(t, err)
require.NotNil(t, current)
Expand Down