Skip to content

Commit

Permalink
Refactor collectors to not depend of kingpin to be configured.
Browse files Browse the repository at this point in the history
This commit removes references to kingpin.CommandLine, allowing for the
collector package to be used and configured with a custom kingpin (or no
kingpin at all).

The configuration for collectors has been moved to struct fields, which
the kingpin flags populate at flag parse time.

Co-authored-by: Robert Fratto <[email protected]>
  • Loading branch information
marctc and rfratto committed Oct 5, 2023
1 parent c8b9064 commit 3678363
Show file tree
Hide file tree
Showing 16 changed files with 240 additions and 173 deletions.
26 changes: 16 additions & 10 deletions collector/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,23 @@ var (
versionRE = regexp.MustCompile(`^\d+\.\d+`)
)

// Tunable flags.
var (
exporterLockTimeout = kingpin.Flag(
// Config holds configuration options for the exporter.
type Config struct {
LockTimeout int
SlowLogFilter bool
}

// RegisterFlags adds flags to configure the exporter.
func (c *Config) RegisterFlags(application *kingpin.Application) {
application.Flag(
"exporter.lock_wait_timeout",
"Set a lock_wait_timeout (in seconds) on the connection to avoid long metadata locking.",
).Default("2").Int()
slowLogFilter = kingpin.Flag(
).Default("2").IntVar(&c.LockTimeout)
application.Flag(
"exporter.log_slow_filter",
"Add a log_slow_filter to avoid slow query logging of scrapes. NOTE: Not supported by Oracle MySQL.",
).Default("false").Bool()
)
).Default("false").BoolVar(&c.SlowLogFilter)
}

// metric definition
var (
Expand Down Expand Up @@ -95,11 +101,11 @@ type Exporter struct {
}

// New returns a new MySQL exporter for the provided DSN.
func New(ctx context.Context, dsn string, scrapers []Scraper, logger log.Logger) *Exporter {
func New(ctx context.Context, dsn string, scrapers []Scraper, logger log.Logger, cfg Config) *Exporter {
// Setup extra params for the DSN, default to having a lock timeout.
dsnParams := []string{fmt.Sprintf(timeoutParam, *exporterLockTimeout)}
dsnParams := []string{fmt.Sprintf(timeoutParam, cfg.LockTimeout)}

if *slowLogFilter {
if cfg.SlowLogFilter {
dsnParams = append(dsnParams, sessionSettingsParam)
}

Expand Down
10 changes: 10 additions & 0 deletions collector/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"os"
"testing"

"github.com/alecthomas/kingpin/v2"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -33,13 +34,22 @@ func TestExporter(t *testing.T) {
t.Skip("-short is passed, skipping test")
}

var exporterConfig Config
kingpinApp := kingpin.New("TestExporter", "")
exporterConfig.RegisterFlags(kingpinApp)
_, err := kingpinApp.Parse([]string{})
if err != nil {
t.Fatal(err)
}

exporter := New(
context.Background(),
dsn,
[]Scraper{
ScrapeGlobalStatus{},
},
log.NewNopLogger(),
exporterConfig,
)

convey.Convey("Metrics describing", t, func() {
Expand Down
45 changes: 25 additions & 20 deletions collector/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,6 @@ const (
heartbeatQuery = "SELECT UNIX_TIMESTAMP(ts), UNIX_TIMESTAMP(%s), server_id from `%s`.`%s`"
)

var (
collectHeartbeatDatabase = kingpin.Flag(
"collect.heartbeat.database",
"Database from where to collect heartbeat data",
).Default("heartbeat").String()
collectHeartbeatTable = kingpin.Flag(
"collect.heartbeat.table",
"Table from where to collect heartbeat data",
).Default("heartbeat").String()
collectHeartbeatUtc = kingpin.Flag(
"collect.heartbeat.utc",
"Use UTC for timestamps of the current server (`pt-heartbeat` is called with `--utc`)",
).Bool()
)

// Metric descriptors.
var (
HeartbeatStoredDesc = prometheus.NewDesc(
Expand All @@ -74,7 +59,11 @@ var (
// server_id int unsigned NOT NULL PRIMARY KEY,
//
// );
type ScrapeHeartbeat struct{}
type ScrapeHeartbeat struct {
Database string
Table string
UTC bool
}

// Name of the Scraper. Should be unique.
func (ScrapeHeartbeat) Name() string {
Expand All @@ -91,17 +80,33 @@ func (ScrapeHeartbeat) Version() float64 {
return 5.1
}

// RegisterFlags adds flags to configure the Scraper.
func (s *ScrapeHeartbeat) RegisterFlags(application *kingpin.Application) {
application.Flag(
"collect.heartbeat.database",
"Database from where to collect heartbeat data",
).Default("heartbeat").StringVar(&s.Database)
application.Flag(
"collect.heartbeat.table",
"Table from where to collect heartbeat data",
).Default("heartbeat").StringVar(&s.Table)
application.Flag(
"collect.heartbeat.utc",
"Use UTC for timestamps of the current server (`pt-heartbeat` is called with `--utc`)",
).BoolVar(&s.UTC)
}

// nowExpr returns a current timestamp expression.
func nowExpr() string {
if *collectHeartbeatUtc {
func (s ScrapeHeartbeat) nowExpr() string {
if s.UTC {
return "UTC_TIMESTAMP(6)"
}
return "NOW(6)"
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeHeartbeat) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
query := fmt.Sprintf(heartbeatQuery, nowExpr(), *collectHeartbeatDatabase, *collectHeartbeatTable)
func (s ScrapeHeartbeat) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
query := fmt.Sprintf(heartbeatQuery, s.nowExpr(), s.Database, s.Table)
heartbeatRows, err := db.QueryContext(ctx, query)
if err != nil {
return err
Expand Down
9 changes: 7 additions & 2 deletions collector/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ var ScrapeHeartbeatTestCases = []ScrapeHeartbeatTestCase{
func TestScrapeHeartbeat(t *testing.T) {
for _, tt := range ScrapeHeartbeatTestCases {
t.Run(fmt.Sprint(tt.Args), func(t *testing.T) {
_, err := kingpin.CommandLine.Parse(tt.Args)
scraper := ScrapeHeartbeat{}

app := kingpin.New("TestScrapeHeartbeat", "")
scraper.RegisterFlags(app)

_, err := app.Parse(tt.Args)
if err != nil {
t.Fatal(err)
}
Expand All @@ -72,7 +77,7 @@ func TestScrapeHeartbeat(t *testing.T) {

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeHeartbeat{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
if err = scraper.Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
47 changes: 26 additions & 21 deletions collector/info_schema_processlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,6 @@ const infoSchemaProcesslistQuery = `
GROUP BY user, SUBSTRING_INDEX(host, ':', 1), command, state
`

// Tunable flags.
var (
processlistMinTime = kingpin.Flag(
"collect.info_schema.processlist.min_time",
"Minimum time a thread must be in each state to be counted",
).Default("0").Int()
processesByUserFlag = kingpin.Flag(
"collect.info_schema.processlist.processes_by_user",
"Enable collecting the number of processes by user",
).Default("true").Bool()
processesByHostFlag = kingpin.Flag(
"collect.info_schema.processlist.processes_by_host",
"Enable collecting the number of processes by host",
).Default("true").Bool()
)

// Metric descriptors.
var (
processlistCountDesc = prometheus.NewDesc(
Expand All @@ -79,7 +63,11 @@ var (
)

// ScrapeProcesslist collects from `information_schema.processlist`.
type ScrapeProcesslist struct{}
type ScrapeProcesslist struct {
ProcessListMinTime int
ProcessesByUserFlag bool
ProcessesByHostFlag bool
}

// Name of the Scraper. Should be unique.
func (ScrapeProcesslist) Name() string {
Expand All @@ -96,11 +84,27 @@ func (ScrapeProcesslist) Version() float64 {
return 5.1
}

// RegisterFlags adds flags to configure the Scraper.
func (s *ScrapeProcesslist) RegisterFlags(application *kingpin.Application) {
application.Flag(
"collect.info_schema.processlist.min_time",
"Minimum time a thread must be in each state to be counted",
).Default("0").IntVar(&s.ProcessListMinTime)
application.Flag(
"collect.info_schema.processlist.processes_by_user",
"Enable collecting the number of processes by user",
).Default("true").BoolVar(&s.ProcessesByUserFlag)
application.Flag(
"collect.info_schema.processlist.processes_by_host",
"Enable collecting the number of processes by host",
).Default("true").BoolVar(&s.ProcessesByHostFlag)
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeProcesslist) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
func (s ScrapeProcesslist) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
processQuery := fmt.Sprintf(
infoSchemaProcesslistQuery,
*processlistMinTime,
s.ProcessListMinTime,
)
processlistRows, err := db.QueryContext(ctx, processQuery)
if err != nil {
Expand Down Expand Up @@ -162,12 +166,13 @@ func (ScrapeProcesslist) Scrape(ctx context.Context, db *sql.DB, ch chan<- prome
}
}

if *processesByHostFlag {
if s.ProcessesByHostFlag {
for _, host := range sortedMapKeys(stateHostCounts) {
ch <- prometheus.MustNewConstMetric(processesByHostDesc, prometheus.GaugeValue, float64(stateHostCounts[host]), host)
}
}
if *processesByUserFlag {

if s.ProcessesByUserFlag {
for _, user := range sortedMapKeys(stateUserCounts) {
ch <- prometheus.MustNewConstMetric(processesByUserDesc, prometheus.GaugeValue, float64(stateUserCounts[user]), user)
}
Expand Down
8 changes: 6 additions & 2 deletions collector/info_schema_processlist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ import (
)

func TestScrapeProcesslist(t *testing.T) {
_, err := kingpin.CommandLine.Parse([]string{
scraper := ScrapeProcesslist{}
app := kingpin.New("TestScrapeProcesslist", "")
scraper.RegisterFlags(app)

_, err := app.Parse([]string{
"--collect.info_schema.processlist.processes_by_user",
"--collect.info_schema.processlist.processes_by_host",
})
Expand Down Expand Up @@ -56,7 +60,7 @@ func TestScrapeProcesslist(t *testing.T) {

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeProcesslist{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
if err = scraper.Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
26 changes: 14 additions & 12 deletions collector/info_schema_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,6 @@ const (
`
)

// Tunable flags.
var (
tableSchemaDatabases = kingpin.Flag(
"collect.info_schema.tables.databases",
"The list of databases to collect table stats for, or '*' for all",
).Default("*").String()
)

// Metric descriptors.
var (
infoSchemaTablesVersionDesc = prometheus.NewDesc(
Expand All @@ -79,7 +71,9 @@ var (
)

// ScrapeTableSchema collects from `information_schema.tables`.
type ScrapeTableSchema struct{}
type ScrapeTableSchema struct {
Databases string
}

// Name of the Scraper. Should be unique.
func (ScrapeTableSchema) Name() string {
Expand All @@ -96,10 +90,18 @@ func (ScrapeTableSchema) Version() float64 {
return 5.1
}

// RegisterFlags adds flags to configure the Scraper.
func (s *ScrapeTableSchema) RegisterFlags(application *kingpin.Application) {
application.Flag(
"collect.info_schema.tables.databases",
"The list of databases to collect table stats for, or '*' for all",
).Default("*").StringVar(&s.Databases)
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeTableSchema) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
func (s ScrapeTableSchema) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
var dbList []string
if *tableSchemaDatabases == "*" {
if s.Databases == "*" {
dbListRows, err := db.QueryContext(ctx, dbListQuery)
if err != nil {
return err
Expand All @@ -117,7 +119,7 @@ func (ScrapeTableSchema) Scrape(ctx context.Context, db *sql.DB, ch chan<- prome
dbList = append(dbList, database)
}
} else {
dbList = strings.Split(*tableSchemaDatabases, ",")
dbList = strings.Split(s.Databases, ",")
}

for _, database := range dbList {
Expand Down
24 changes: 13 additions & 11 deletions collector/mysql_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,6 @@ const mysqlUserQuery = `
FROM mysql.user
`

// Tunable flags.
var (
userPrivilegesFlag = kingpin.Flag(
"collect.mysql.user.privileges",
"Enable collecting user privileges from mysql.user",
).Default("false").Bool()
)

var (
labelNames = []string{"mysql_user", "hostmask"}
)
Expand All @@ -102,7 +94,9 @@ var (
)

// ScrapeUser collects from `information_schema.processlist`.
type ScrapeUser struct{}
type ScrapeUser struct {
Privileges bool
}

// Name of the Scraper. Should be unique.
func (ScrapeUser) Name() string {
Expand All @@ -119,8 +113,16 @@ func (ScrapeUser) Version() float64 {
return 5.1
}

// RegisterFlags adds flags to configure the Scraper.
func (s *ScrapeUser) RegisterFlags(application *kingpin.Application) {
application.Flag(
"collect.mysql.user.privileges",
"Enable collecting user privileges from mysql.user",
).Default("false").BoolVar(&s.Privileges)
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeUser) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
func (s ScrapeUser) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
var (
userRows *sql.Rows
err error
Expand Down Expand Up @@ -213,7 +215,7 @@ func (ScrapeUser) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.M
return err
}

if *userPrivilegesFlag {
if s.Privileges {
userCols, err := userRows.Columns()
if err != nil {
return err
Expand Down
Loading

0 comments on commit 3678363

Please sign in to comment.