Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add mysql adapter #2

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
28 changes: 28 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
module github.com/bolcom/prometheus-remote-storage-adapter

go 1.18

require (
github.com/Sirupsen/logrus v0.11.5
github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a
github.com/gogo/protobuf v0.0.0-20170307180453-100ba4e88506
github.com/golang/protobuf v0.0.0-20170509165901-b50ceb1fa981
github.com/golang/snappy v0.0.0-20170215233205-553a64147049
github.com/influxdata/influxdb v1.2.4
github.com/matttproud/golang_protobuf_extensions v1.0.0
github.com/prometheus/client_golang v0.8.0
github.com/prometheus/client_model v0.0.0-20170216185247-6f3806018612
github.com/prometheus/common v0.0.0-20170427095455-13ba4ddd0caa
github.com/prometheus/procfs v0.0.0-20170502131342-d098ca18df8b
github.com/prometheus/prometheus v1.6.1
github.com/syndtr/goleveldb v0.0.0-20170409015612-8c81ea47d4c4
golang.org/x/net v0.0.0-20161116075034-4971afdc2f16
golang.org/x/sys v0.0.0-20170511214829-9c9d83fe39ed
golang.org/x/time v0.0.0-20170424234030-8be79e1e0910
gopkg.in/yaml.v2 v2.0.0-20160928153709-a5b47d31c556
)

require (
github.com/go-sql-driver/mysql v1.7.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
38 changes: 38 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
github.com/Sirupsen/logrus v0.11.5 h1:aIMrrsnipdTlAieMe7FC/iiuJ0+ELiXCT4YiVQiK9j8=
github.com/Sirupsen/logrus v0.11.5/go.mod h1:rmk17hk6i8ZSAJkSDa7nOxamrG+SP4P0mm+DAvExv4U=
github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a h1:BtpsbiV638WQZwhA98cEZw2BsbnQJrbd0BI7tsy0W1c=
github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc=
github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/gogo/protobuf v0.0.0-20170307180453-100ba4e88506 h1:zDlw+wgyXdfkRuvFCdEDUiPLmZp2cvf/dWHazY0a5VM=
github.com/gogo/protobuf v0.0.0-20170307180453-100ba4e88506/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/protobuf v0.0.0-20170509165901-b50ceb1fa981 h1:dGeVS+GokejawVlaXqp2v9D+S5MRIgdsSS48pGX3tYs=
github.com/golang/protobuf v0.0.0-20170509165901-b50ceb1fa981/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.0-20170215233205-553a64147049 h1:K9KHZbXKpGydfDN0aZrsoHpLJlZsBrGMFWbgLDGnPZk=
github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/influxdata/influxdb v1.2.4 h1:eZ6+LGnBO9X/JgoYHAGw7WqL3mZtiwpya/WqasVj+uM=
github.com/influxdata/influxdb v1.2.4/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY=
github.com/matttproud/golang_protobuf_extensions v1.0.0 h1:YNOwxxSJzSUARoD9KRZLzM9Y858MNGCOACTvCW9TSAc=
github.com/matttproud/golang_protobuf_extensions v1.0.0/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/prometheus/client_golang v0.8.0 h1:1921Yw9Gc3iSc4VQh3PIoOqgPCZS7G/4xQNVUp8Mda8=
github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_model v0.0.0-20170216185247-6f3806018612 h1:13pIdM2tpaDi4OVe24fgoIS7ZTqMt0QI+bwQsX5hq+g=
github.com/prometheus/client_model v0.0.0-20170216185247-6f3806018612/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/common v0.0.0-20170427095455-13ba4ddd0caa h1:WBOqSBZzK9pqPXiewLT2aL9evdTCy4hUefz0h3iJGGI=
github.com/prometheus/common v0.0.0-20170427095455-13ba4ddd0caa/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20170502131342-d098ca18df8b h1:NXG7/u0x9fMr3enQ2oO3yLpF4pgT8lWT/Qoy3Qb4r54=
github.com/prometheus/procfs v0.0.0-20170502131342-d098ca18df8b/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/prometheus v1.6.1 h1:ARfu54mxz0/LYOYcxflKtWXZm9gMfteG8xIcuMV2rR0=
github.com/prometheus/prometheus v1.6.1/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s=
github.com/syndtr/goleveldb v0.0.0-20170409015612-8c81ea47d4c4 h1:PoqFAtRY0Q02baZW5o00/NOTpTdTwVl+x1UnvpYK0Dc=
github.com/syndtr/goleveldb v0.0.0-20170409015612-8c81ea47d4c4/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
golang.org/x/net v0.0.0-20161116075034-4971afdc2f16 h1:x2xFZACPoDbV+g+48fDH/4EQTTNPgHTRko7g0JQiZws=
golang.org/x/net v0.0.0-20161116075034-4971afdc2f16/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sys v0.0.0-20170511214829-9c9d83fe39ed/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/time v0.0.0-20170424234030-8be79e1e0910 h1:bCMaBn7ph495H+x72gEvgcv+mDRd9dElbzo/mVCMxX4=
golang.org/x/time v0.0.0-20170424234030-8be79e1e0910/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.0.0-20160928153709-a5b47d31c556 h1:hKXbLW5oaJoQgs8KrzTLdF4PoHi+0oQPgea9TNtvE3E=
gopkg.in/yaml.v2 v2.0.0-20160928153709-a5b47d31c556/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
25 changes: 25 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package main
import (
"flag"
"fmt"
"github.com/bolcom/prometheus-remote-storage-adapter/mysql"
"io/ioutil"
"net/http"
_ "net/http/pprof"
Expand Down Expand Up @@ -52,6 +53,11 @@ type config struct {
remoteTimeout time.Duration
listenAddr string
telemetryPath string
mysqlAddress string
mysqlPort string
mysqlUsername string
mysqlPassword string
mysqlDatabase string
}

var (
Expand Down Expand Up @@ -105,6 +111,21 @@ func parseFlags() *config {
influxdbPassword: os.Getenv("INFLUXDB_PW"),
}

flag.StringVar(&cfg.mysqlAddress, "mysql-address", "localhost",
"The mysql server connection address. None, if empty.",
)
flag.StringVar(&cfg.mysqlPort, "mysql-port", "3306",
"The mysql server connection address. None, if empty.",
)
flag.StringVar(&cfg.mysqlUsername, "mysql-username", "root",
"The mysql server connection username. None, if empty.",
)
flag.StringVar(&cfg.mysqlPassword, "mysql-password", "",
"The mysql server connection password. None, if empty.",
)
flag.StringVar(&cfg.mysqlDatabase, "mysql-database", "monitor",
"The mysql server connection database. None, if empty.",
)
flag.StringVar(&cfg.graphiteAddress, "graphite-address", "",
"The host:port of the Graphite server to send samples to. None, if empty.",
)
Expand Down Expand Up @@ -179,6 +200,10 @@ func buildClients(cfg *config) ([]writer, []reader) {
writers = append(writers, c)
readers = append(readers, c)
}
if cfg.mysqlAddress != "" {
c := mysql.NewClient(cfg.mysqlAddress, cfg.mysqlPort, cfg.mysqlUsername, cfg.mysqlPassword, cfg.mysqlDatabase)
writers = append(writers, c)
}
return writers, readers
}

Expand Down
206 changes: 206 additions & 0 deletions mysql/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package mysql

import (
"database/sql"
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"gopkg.in/yaml.v2"
"io/ioutil"
"strconv"
"strings"
)

type Client struct {
Address string
Port string
Username string
Password string
Database string
}

func NewClient(address string, port string, username string, password string, database string) *Client {

return &Client{
Address: address,
Port: port,
Username: username,
Password: password,
Database: database,
}
}

func (client *Client) Name() string {
return "mysql"
}

func (client *Client) Write(samples model.Samples) error {

//Open mysql connections.
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/information_schema", client.Username, client.Password, client.Address, client.Port))
if err != nil {
return err
}
defer db.Close()

//Parse schema properties.
schemas, _ := client.ParseYml()
err = client.Schemas(schemas)
if err != nil {
return err
}

//Open MySQL transaction.
trx, err := db.Begin()
if err != nil {
return err
}

for _, sample := range samples {
metric := sample.Metric
value := sample.Value
timestamp := sample.Timestamp

//only write schema tables.
sampleName, ok := metric["__name__"]
if !ok {
log.Infoln("received invalid sample %v", sample)
continue
}

persistentSchemas := schemas.Schemas
if len(persistentSchemas) == 0 {
log.Infoln("There's nothing to persistent schema")
continue
}

var isNeed bool = false
var isNeedSchema Schema
for _, ps := range persistentSchemas {
if strings.Compare(ps.SchemaName, string(sampleName)) == 0 {
isNeed = true
isNeedSchema = ps
break
}
}

if isNeed {
schemaName := isNeedSchema.SchemaName
schemaColumns := isNeedSchema.SchemaColumns

var finalInsert = fmt.Sprintf("INSERT INTO %s.%s", client.Database, schemaName)
var finalColumns = "("
var finalValues = " VALUES("
for key, val := range metric {
for _, sc := range schemaColumns {
column := sc.Column
var suffix string = ","
if strings.Compare(column, string(key)) == 0 {
finalColumns = finalColumns + column + suffix
switch sc.Type {
case "string":
finalValues = finalValues + "'" + string(val) + "'" + suffix
case "long":
finalValues = finalValues + string(val) + suffix
case "float":
finalValues = finalValues + string(val) + suffix
}
}
}
}
finalColumns = finalColumns + "record_timestamp,value"
finalColumns = finalColumns + ")"
finalValues = finalValues + strconv.Itoa(int(timestamp)) + "," + strconv.FormatFloat(float64(value), 'f', 5, 32)
finalValues = finalValues + ")"
finalInsert = finalInsert + finalColumns + finalValues

_, err := trx.Exec(finalInsert)
if err != nil {
log.Infoln("insert record failed %s", finalInsert)
trx.Rollback()
}
}

}
err = trx.Commit()
if err != nil {
return err
}
return nil
}

func (client *Client) ParseYml() (*Schemas, error) {
data, err := ioutil.ReadFile("schema.yml")
if err != nil {
fmt.Println(err.Error())
return nil, err
}
var conf = new(Schemas)
if err := yaml.Unmarshal(data, conf); err != nil {
fmt.Printf("err: %v\n", err)
return nil, err
}
return conf, err
}

func (client *Client) Schemas(schemas *Schemas) error {

//Do nothing if schemas length is zero.
s := schemas.Schemas
if len(s) == 0 {
return nil
}

//Open mysql connection.
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/information_schema", client.Username, client.Password, client.Address, client.Port))
if err != nil {
return err
}
defer db.Close()

//Create target database.
_, err = db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s CHARACTER SET utf8", client.Database))
if err != nil {
return err
}

//Create table Schemas.
for _, schema := range s {
tableName := schema.SchemaName
var tableSchema = "CREATE TABLE IF NOT EXISTS " + client.Database + "." + tableName + "("
primaryKey := "id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,"
tableSchema = tableSchema + primaryKey
columns := schema.SchemaColumns
for index, column := range columns {
columnName := column.Column
columnType := column.Type
columnSize := column.Size

var suffix string
if index == len(columns)-1 {
suffix = ""
} else {
suffix = ","
}

switch columnType {
case "string":
columnSchema := fmt.Sprintf("%s VARCHAR(%d)%s", columnName, columnSize, suffix)
tableSchema = tableSchema + columnSchema
case "long":
column_schema := fmt.Sprintf("%s BIGINT%s", columnName, suffix)
tableSchema = tableSchema + column_schema
case "float":
column_schema := fmt.Sprintf("%s float%s", columnName, suffix)
tableSchema = tableSchema + column_schema
}
}
tableSchema = tableSchema + ") ENGINE=InnoDB CHARSET = utf8mb4"
_, err := db.Exec(tableSchema)
if err != nil {
return err
}
}
return nil
}
16 changes: 16 additions & 0 deletions mysql/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package mysql

type Column struct {
Column string `yaml:"column"`
Type string `yaml:"type"`
Size int `yaml:"size"`
}

type Schema struct {
SchemaName string `yaml:"schema_name"`
SchemaColumns []Column `yaml:"schema_columns"`
}

type Schemas struct {
Schemas []Schema `yaml:"schemas"`
}
36 changes: 36 additions & 0 deletions mysql/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
schemas:
- schema_name: node_cpu_seconds_total
schema_columns:
- column: cpu
type: string
size: 16
- column: instance
type: string
size: 255
- column: job
type: string
size: 255
- column: mode
type: string
size: 255
- column: record_timestamp
type: long
size: 0
- column: value
type: float
size: 0

- schema_name: scrape_series_added
schema_columns:
- column: instance
type: string
size: 255
- column: job
type: string
size: 255
- column: record_timestamp
type: long
size: 0
- column: value
type: float
size: 0
1 change: 0 additions & 1 deletion vendor/github.com/Sirupsen/logrus/.gitignore

This file was deleted.

8 changes: 0 additions & 8 deletions vendor/github.com/Sirupsen/logrus/.travis.yml

This file was deleted.

Loading