Skip to content
This repository has been archived by the owner on Aug 24, 2022. It is now read-only.

PMM-9632 Settings view usage. #374

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
10 changes: 8 additions & 2 deletions agents/postgres/pgstatmonitor/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,14 @@ type pgStatMonitorSettings struct {
//
//reform:pg_stat_monitor_settings
type pgStatMonitorSettingsTextValue struct {
Name string `reform:"name"`
Value string `reform:"value"`
Name string `reform:"name"`
Value string `reform:"value"`
DefaultValue string `reform:"default_value"`
Description string `reform:"description"`
Minimum *int64 `reform:"minimum"`
Maximum *int64 `reform:"maximum"`
Options *string `reform:"options"`
Restart string `reform:"restart"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's keep only fields that we use in case PGSM team changes structure later we will have more PMM clients supported latest PGSM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

// pgStatMonitorExtended contains pgStatMonitor data and extends it with database, username and tables data.
Expand Down
32 changes: 31 additions & 1 deletion agents/postgres/pgstatmonitor/models_reform.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

201 changes: 104 additions & 97 deletions agents/postgres/pgstatmonitor/pgstatmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import (

"github.com/AlekSi/pointer"
ver "github.com/hashicorp/go-version"
"github.com/lib/pq" //nolint:gci
_ "github.com/lib/pq" // register SQL driver.
"github.com/lib/pq"
"github.com/percona/pmm/api/agentpb"
"github.com/percona/pmm/api/inventorypb"
"github.com/percona/pmm/utils/sqlmetrics"
Expand All @@ -45,20 +44,12 @@ const defaultWaitTime = 60 * time.Second

// PGStatMonitorQAN QAN services connects to PostgreSQL and extracts stats.
type PGStatMonitorQAN struct {
q *reform.Querier
dbCloser io.Closer
agentID string
l *logrus.Entry
changes chan agents.Change
monitorCache *statMonitorCache

// By default, query shows the actual parameter instead of the placeholder.
BupycHuk marked this conversation as resolved.
Show resolved Hide resolved
// It is quite useful when users want to use that query and try to run that
// query to check the abnormalities. But in most cases users like the queries
// with a placeholder. This parameter is used to toggle between the two said
// options.
pgsmNormalizedQuery bool
waitTime time.Duration
q *reform.Querier
dbCloser io.Closer
agentID string
l *logrus.Entry
changes chan agents.Change
monitorCache *statMonitorCache
disableQueryExamples bool
}

Expand Down Expand Up @@ -92,7 +83,7 @@ const (
commandTypeUpdate = "UPDATE"
commandTypeInsert = "INSERT"
commandTypeDelete = "DELETE"
commandTypeUtiity = "UTILITY"
commandTypeUtility = "UTILITY"
Copy link
Contributor Author

@JiriCtvrtka JiriCtvrtka May 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already fixed in previous PR, but its not merged yet.

)

var commandTypeToText = []string{
Expand All @@ -101,7 +92,7 @@ var commandTypeToText = []string{
commandTypeUpdate,
commandTypeInsert,
commandTypeDelete,
commandTypeUtiity,
commandTypeUtility,
commandTextNotAvailable,
}

Expand All @@ -122,25 +113,6 @@ func New(params *Params, l *logrus.Entry) (*PGStatMonitorQAN, error) {
return newPgStatMonitorQAN(q, sqlDB, params.AgentID, params.DisableQueryExamples, l)
}

func isPropertyValueInt(property string) bool {
switch property {
case
"pg_stat_monitor.pgsm_histogram_max",
"pg_stat_monitor.pgsm_query_max_len",
"pg_stat_monitor.pgsm_max",
"pg_stat_monitor.pgsm_bucket_time",
"pg_stat_monitor.pgsm_query_shared_buffer",
"pg_stat_monitor.pgsm_max_buckets",
"pg_stat_monitor.pgsm_histogram_buckets",
"pg_stat_monitor.pgsm_overflow_target",
"pg_stat_monitor.pgsm_histogram_min":

return true
}

return false
}

func areSettingsTextValues(q *reform.Querier) (bool, error) {
pgsmVersion, prerelease, err := getPGMonitorVersion(q)
if err != nil {
Expand All @@ -155,66 +127,13 @@ func areSettingsTextValues(q *reform.Querier) (bool, error) {
}

func newPgStatMonitorQAN(q *reform.Querier, dbCloser io.Closer, agentID string, disableQueryExamples bool, l *logrus.Entry) (*PGStatMonitorQAN, error) {
var settings []reform.Struct

settingsValuesAreText, err := areSettingsTextValues(q)
if err != nil {
return nil, err
}
if settingsValuesAreText {
settings, err = q.SelectAllFrom(pgStatMonitorSettingsTextValueView, "")
} else {
settings, err = q.SelectAllFrom(pgStatMonitorSettingsView, "")
}
if err != nil {
return nil, errors.Wrap(err, "failed to get settings")
}

var normalizedQuery bool
waitTime := defaultWaitTime
for _, row := range settings {
var name string
var value int64

if settingsValuesAreText {
setting := row.(*pgStatMonitorSettingsTextValue)
name = setting.Name
if !isPropertyValueInt(name) {
continue
}

valueInt, err := strconv.ParseInt(setting.Value, 10, 64)
if err != nil {
return nil, errors.Wrap(err, "value cannot be parsed as integer")
}
value = valueInt
} else {
setting := row.(*pgStatMonitorSettings)
name = setting.Name
value = setting.Value
}

if err == nil {
switch name {
case "pg_stat_monitor.pgsm_normalized_query":
normalizedQuery = value == 1
case "pg_stat_monitor.pgsm_bucket_time":
if value < int64(defaultWaitTime.Seconds()) {
waitTime = time.Duration(value) * time.Second
}
}
}
}

return &PGStatMonitorQAN{
q: q,
dbCloser: dbCloser,
agentID: agentID,
l: l,
changes: make(chan agents.Change, 10),
monitorCache: newStatMonitorCache(l),
pgsmNormalizedQuery: normalizedQuery,
waitTime: waitTime,
disableQueryExamples: disableQueryExamples,
}, nil
}
Expand Down Expand Up @@ -276,10 +195,21 @@ func (m *PGStatMonitorQAN) Run(ctx context.Context) {
close(m.changes)
}()

settings, err := m.getSettings()
if err != nil {
m.l.Error(err)
m.changes <- agents.Change{Status: inventorypb.AgentStatus_WAITING}
}
normalizedQuery, err := settings.getNormalizedQueryValue()
if err != nil {
m.l.Error(err)
m.changes <- agents.Change{Status: inventorypb.AgentStatus_WAITING}
}

// add current stat monitor to cache so they are not send as new on first iteration with incorrect timestamps
var running bool
m.changes <- agents.Change{Status: inventorypb.AgentStatus_STARTING}
if current, _, err := m.monitorCache.getStatMonitorExtended(ctx, m.q, m.pgsmNormalizedQuery); err == nil {
if current, _, err := m.monitorCache.getStatMonitorExtended(ctx, m.q, normalizedQuery); err == nil {
m.monitorCache.refresh(current)
m.l.Debugf("Got %d initial stat monitor.", len(current))
running = true
Expand All @@ -289,10 +219,15 @@ func (m *PGStatMonitorQAN) Run(ctx context.Context) {
m.changes <- agents.Change{Status: inventorypb.AgentStatus_WAITING}
}

waitTime, err := settings.getWaitTime()
if err != nil {
m.l.Warning(err)
}

// query pg_stat_monitor every waitTime seconds
start := time.Now()
m.l.Debugf("Scheduling next collection in %s at %s.", m.waitTime, start.Add(m.waitTime).Format("15:04:05"))
t := time.NewTimer(m.waitTime)
m.l.Debugf("Scheduling next collection in %s at %s.", waitTime, start.Add(waitTime).Format("15:04:05"))
t := time.NewTimer(waitTime)
defer t.Stop()

for {
Expand All @@ -307,12 +242,12 @@ func (m *PGStatMonitorQAN) Run(ctx context.Context) {
m.changes <- agents.Change{Status: inventorypb.AgentStatus_STARTING}
}

lengthS := uint32(m.waitTime.Seconds())
lengthS := uint32(waitTime.Seconds())
buckets, err := m.getNewBuckets(ctx, lengthS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can call settings here and pass them to the getNewBuckets method, in that case, you will be able to support immediate change for waitTime as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


start = time.Now()
m.l.Debugf("Scheduling next collection in %s at %s.", m.waitTime, start.Add(m.waitTime).Format("15:04:05"))
t.Reset(m.waitTime)
m.l.Debugf("Scheduling next collection in %s at %s.", waitTime, start.Add(waitTime).Format("15:04:05"))
t.Reset(waitTime)

if err != nil {
m.l.Error(errors.Wrap(err, "getNewBuckets failed"))
Expand All @@ -331,8 +266,80 @@ func (m *PGStatMonitorQAN) Run(ctx context.Context) {
}
}

type settings map[string]*pgStatMonitorSettingsTextValue

func (m *PGStatMonitorQAN) getSettings() (settings, error) {
var settingsRows []reform.Struct

settingsValuesAreText, err := areSettingsTextValues(m.q)
if err != nil {
return nil, err
}
if settingsValuesAreText {
settingsRows, err = m.q.SelectAllFrom(pgStatMonitorSettingsTextValueView, "")
} else {
settingsRows, err = m.q.SelectAllFrom(pgStatMonitorSettingsView, "")
}
if err != nil {
return nil, errors.Wrap(err, "failed to get settings")
}

settings := make(settings)
for _, row := range settingsRows {
if settingsValuesAreText {
setting := row.(*pgStatMonitorSettingsTextValue)
settings[setting.Name] = setting
} else {
setting := row.(*pgStatMonitorSettings)
name := setting.Name
settings[name] = &pgStatMonitorSettingsTextValue{
Name: name,
Value: fmt.Sprintf("%d", setting.Value),
}
}
}

return settings, nil
}

func (s settings) getNormalizedQueryValue() (bool, error) {
key := "pg_stat_monitor.pgsm_normalized_query"
if _, ok := s[key]; !ok {
return false, errors.New("failed to get pgsm_normalized_query property")
}

if s[key].Value == "yes" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I got it right this value might be "1" as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes string "1" for older versions. Thank you.

return true, nil
}

return false, nil
}

func (s settings) getWaitTime() (time.Duration, error) {
key := "pg_stat_monitor.pgsm_bucket_time"
if _, ok := s[key]; !ok {
return defaultWaitTime, errors.New("failed to get pgsm_bucket_time, wait time set on 60 seconds")
}

valueInt, err := strconv.ParseInt(s[key].Value, 10, 64)
if err != nil {
return defaultWaitTime, errors.Wrap(err, "property pgsm_bucket_time cannot be parsed as integer, wait time set on 60 seconds")
}

return time.Duration(valueInt) * time.Second, nil
}

func (m *PGStatMonitorQAN) getNewBuckets(ctx context.Context, periodLengthSecs uint32) ([]*agentpb.MetricsBucket, error) {
current, prev, err := m.monitorCache.getStatMonitorExtended(ctx, m.q, m.pgsmNormalizedQuery)
settings, err := m.getSettings()
if err != nil {
m.l.Errorf(err.Error())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we get an error here, shouldn't we stop the method and return an error?
The same below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Moved before line 245 in pgstatmonitor.go as you required above and added continue there.

}
normalizedQuery, err := settings.getNormalizedQueryValue()
if err != nil {
m.l.Errorf(err.Error())
}

current, prev, err := m.monitorCache.getStatMonitorExtended(ctx, m.q, normalizedQuery)
if err != nil {
return nil, err
}
Expand Down
8 changes: 7 additions & 1 deletion agents/postgres/pgstatmonitor/pgstatmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,13 @@ func TestPGStatMonitorSchema(t *testing.T) {
assert.Equal(t, float32(5), actual.Postgresql.MSharedBlksHitSum+actual.Postgresql.MSharedBlksReadSum)
assert.InDelta(t, 1.5, actual.Postgresql.MSharedBlksHitCnt+actual.Postgresql.MSharedBlksReadCnt, 0.5)
example := ""
if !m.pgsmNormalizedQuery && !m.disableQueryExamples {

settings, err := m.getSettings()
require.NoError(t, err)
normalizedQuery, err := settings.getNormalizedQueryValue()
require.NoError(t, err)

if !normalizedQuery && !m.disableQueryExamples {
example = actual.Common.Example
}

Expand Down
Loading