Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

D1 analytics #8

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ repos:
- id: debug-statements

- repo: https://github.com/golangci/golangci-lint
rev: v1.58.0
rev: v1.59.1
hooks:
- id: golangci-lint
102 changes: 102 additions & 0 deletions cloudflare.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (

var (
graphqlClient *graphql.Client
d1IDToName = map[string]string{}
)

type cloudflareResponse struct {
Expand Down Expand Up @@ -78,6 +79,41 @@ type cloudflareResponseR2Account struct {
}
}

type d1AccountResp struct {
D1AnalyticsAdaptiveGroups []struct {
Dimensions struct {
DatabaseID string `json:"databaseId"`
} `json:"dimensions"`
Quantiles struct {
QueryBatchResponseBytesP50 int `json:"queryBatchResponseBytesP50"`
QueryBatchResponseBytesP90 int `json:"queryBatchResponseBytesP90"`
QueryBatchTimeMsP50 float64 `json:"queryBatchTimeMsP50"`
QueryBatchTimeMsP90 float64 `json:"queryBatchTimeMsP90"`
} `json:"quantiles"`
Sum struct {
QueryBatchResponseBytes int `json:"queryBatchResponseBytes"`
ReadQueries int `json:"readQueries"`
RowsRead int `json:"rowsRead"`
RowsWritten int `json:"rowsWritten"`
WriteQueries int `json:"writeQueries"`
} `json:"sum"`
} `json:"d1AnalyticsAdaptiveGroups"`
D1StorageAdaptiveGroups []struct {
Dimensions struct {
DatabaseID string `json:"databaseId"`
} `json:"dimensions"`
Max struct {
DatabaseSizeBytes int `json:"databaseSizeBytes"`
} `json:"max"`
} `json:"d1StorageAdaptiveGroups"`
}

type cloudflareResponseD1Account struct {
Viewer struct {
Accounts []d1AccountResp `json:"accounts"`
}
}

type cloudflareResponseLogpushZone struct {
Viewer struct {
Zones []logpushResponse `json:"zones"`
Expand Down Expand Up @@ -808,6 +844,72 @@ func fetchR2Account(accountID string) (*cloudflareResponseR2Account, error) {
return &resp, nil
}

func fetchD1Account(accountID string) (*cloudflareResponseD1Account, error) {
now := time.Now().Add(-time.Duration(viper.GetInt("scrape_delay")) * time.Second).UTC()
s := 60 * time.Second
now = now.Truncate(s)

request := graphql.NewRequest(`query($accountID: String!, $limit: Int!, $date: String!,) {
viewer {
accounts(filter: { accountTag: $accountID }) {
d1AnalyticsAdaptiveGroups(filter: {
date: $date
}
limit: $limit) {
dimensions {
databaseId
}
quantiles {
queryBatchResponseBytesP50
queryBatchResponseBytesP90
queryBatchTimeMsP50
queryBatchTimeMsP90
}
sum {
queryBatchResponseBytes
readQueries
rowsRead
rowsWritten
writeQueries
}
}
d1StorageAdaptiveGroups(filter: {
date: $date
}
limit: $limit
) {
max {
databaseSizeBytes
}
dimensions {
databaseId
date
}
}
}
}
}`)

if len(viper.GetString("cf_api_token")) > 0 {
request.Header.Set("Authorization", "Bearer "+viper.GetString("cf_api_token"))
} else {
request.Header.Set("X-AUTH-EMAIL", viper.GetString("cf_api_email"))
request.Header.Set("X-AUTH-KEY", viper.GetString("cf_api_key"))
}

request.Var("accountID", accountID)
request.Var("limit", 9999)
request.Var("date", now.Format("2006-01-02"))

ctx := context.Background()
var resp cloudflareResponseD1Account
if err := graphqlClient.Run(ctx, request, &resp); err != nil {
log.Errorf("Error fetching D1 account: %s", err)
return nil, err
}
return &resp, nil
}

func findZoneAccountName(zones []cloudflare.Zone, ID string) (string, string) {
for _, z := range zones {
if z.ID == ID {
Expand Down
15 changes: 11 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func fetchMetrics() {
go fetchWorkerAnalytics(a, &wg)
go fetchLogpushAnalyticsForAccount(a, &wg)
go fetchR2StorageForAccount(a, &wg)
go fetchD1StorageForAccount(a, &wg)
}

// Make requests in groups of cfgBatchSize to avoid rate limit
Expand Down Expand Up @@ -149,8 +150,10 @@ func runExporter() {

if len(viper.GetString("cf_api_token")) > 0 {
cloudflareAPI, err = cloudflare.NewWithAPIToken(viper.GetString("cf_api_token"))
log.Debug("Using API Token")
} else {
cloudflareAPI, err = cloudflare.New(viper.GetString("cf_api_key"), viper.GetString("cf_api_email"))
log.Warn("Using API Key and Email. Consider using API Token instead.")
}
if err != nil {
log.Fatalf("Error creating Cloudflare API client: %s", err)
Expand All @@ -159,9 +162,9 @@ func runExporter() {
graphqlClient = graphql.NewClient(cfGraphQLEndpoint)

if len(viper.GetString("cf_api_token")) > 0 {
status, err := cloudflareAPI.VerifyAPIToken(context.Background())
if err != nil {
log.Fatalf("Error creating Cloudflare API client: %s", err)
status, verifyErr := cloudflareAPI.VerifyAPIToken(context.Background())
if verifyErr != nil {
log.Fatalf("Error verifying API client: %s", verifyErr)
}
log.Debugf("API Token status: %s", status.Status)
}
Expand All @@ -182,7 +185,7 @@ func runExporter() {
go fetchMetrics()
}
}()
goVersion := ""
goVersion := "unknown"
if goBuildInfo, available := debug.ReadBuildInfo(); available {
goVersion = goBuildInfo.GoVersion
}
Expand Down Expand Up @@ -273,6 +276,10 @@ func main() {
viper.BindEnv("log_level")
viper.SetDefault("log_level", "info")

flags.Bool("d1-use-names", false, "use names instead of IDs for D1 storage. Requires D1:Read permission")
viper.BindEnv("d1-use-names")
viper.SetDefault("d1-use-names", false)

viper.BindPFlags(flags)
cmd.Execute()
}
153 changes: 150 additions & 3 deletions prometheus.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package main

import (
"context"
"fmt"
"strconv"
"strings"
"sync"

log "github.com/sirupsen/logrus"

"github.com/biter777/countries"
cloudflare "github.com/cloudflare/cloudflare-go"
"github.com/cloudflare/cloudflare-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/viper"
)
Expand Down Expand Up @@ -54,6 +57,15 @@ const (
logpushFailedJobsZoneMetricName MetricName = "cloudflare_logpush_failed_jobs_zone_count"
r2StorageMetricName MetricName = "cloudflare_r2_storage_bytes"
r2OperationMetricName MetricName = "cloudflare_r2_operation_count"
d1SizeMetricName MetricName = "cloudflare_d1_size_bytes"
d1ReadQueriesMetricName MetricName = "cloudflare_d1_read_queries_count"
d1WriteQueriesMetricName MetricName = "cloudflare_d1_write_queries_count"
d1RowsReadMetricName MetricName = "cloudflare_d1_rows_read_count"
d1RowsWrittenMetricName MetricName = "cloudflare_d1_rows_written_count"
d1QueryBatchTimeP50MetricName MetricName = "cloudflare_d1_query_batch_time_p50"
d1QueryBatchTimeP90MetricName MetricName = "cloudflare_d1_query_batch_time_p90"
d1QueryBatchResponseBytesP50MetricName MetricName = "cloudflare_d1_query_batch_response_p50_bytes"
d1QueryBatchResponseBytesP90MetricName MetricName = "cloudflare_d1_query_batch_response_p90_bytes"
)

type MetricsSet map[MetricName]struct{}
Expand Down Expand Up @@ -280,6 +292,51 @@ var (
Name: r2OperationMetricName.String(),
Help: "Number of operations performed by R2",
}, []string{"account", "bucket", "operation"})

d1Storage = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: d1SizeMetricName.String(),
Help: "Storage used by D1",
}, []string{"account", "database"})

d1ReadQueries = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: d1ReadQueriesMetricName.String(),
Help: "Number of read queries performed by D1",
}, []string{"account", "database"})

d1WriteQueries = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: d1WriteQueriesMetricName.String(),
Help: "Number of write queries performed by D1",
}, []string{"account", "database"})

d1RowsRead = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: d1RowsReadMetricName.String(),
Help: "Number of rows read by D1",
}, []string{"account", "database"})

d1RowsWritten = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: d1RowsWrittenMetricName.String(),
Help: "Number of rows written by D1",
}, []string{"account", "database"})

d1QueryBatchTimeP50 = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: d1QueryBatchTimeP50MetricName.String(),
Help: "Query batch response time in milliseconds (50th percentile).",
}, []string{"account", "database"})

d1QueryBatchTimeP90 = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: d1QueryBatchTimeP90MetricName.String(),
Help: "Query batch response time in milliseconds (90th percentile).",
}, []string{"account", "database"})

d1QueryBatchResponseBytesP50 = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: d1QueryBatchResponseBytesP50MetricName.String(),
Help: "The total number of bytes in the response, including all returned rows and metadata (50th percentile).",
}, []string{"account", "database"})

d1QueryBatchResponseBytesP90 = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: d1QueryBatchResponseBytesP90MetricName.String(),
Help: "The total number of bytes in the response, including all returned rows and metadata (90th percentile).",
}, []string{"account", "database"})
)

func buildAllMetricsSet() MetricsSet {
Expand Down Expand Up @@ -318,6 +375,15 @@ func buildAllMetricsSet() MetricsSet {
allMetricsSet.Add(logpushFailedJobsZoneMetricName)
allMetricsSet.Add(r2OperationMetricName)
allMetricsSet.Add(r2StorageMetricName)
allMetricsSet.Add(d1SizeMetricName)
allMetricsSet.Add(d1ReadQueriesMetricName)
allMetricsSet.Add(d1WriteQueriesMetricName)
allMetricsSet.Add(d1RowsReadMetricName)
allMetricsSet.Add(d1RowsWrittenMetricName)
allMetricsSet.Add(d1QueryBatchTimeP50MetricName)
allMetricsSet.Add(d1QueryBatchTimeP90MetricName)
allMetricsSet.Add(d1QueryBatchResponseBytesP50MetricName)
allMetricsSet.Add(d1QueryBatchResponseBytesP90MetricName)
return allMetricsSet
}

Expand Down Expand Up @@ -437,8 +503,32 @@ func mustRegisterMetrics(deniedMetrics MetricsSet) {
if !deniedMetrics.Has(r2OperationMetricName) {
prometheus.MustRegister(r2Operation)
}
if !deniedMetrics.Has(buildInfoMetricName) {
prometheus.MustRegister(buildInfo)
if !deniedMetrics.Has(d1SizeMetricName) {
prometheus.MustRegister(d1Storage)
}
if !deniedMetrics.Has(d1ReadQueriesMetricName) {
prometheus.MustRegister(d1ReadQueries)
}
if !deniedMetrics.Has(d1WriteQueriesMetricName) {
prometheus.MustRegister(d1WriteQueries)
}
if !deniedMetrics.Has(d1RowsReadMetricName) {
prometheus.MustRegister(d1RowsRead)
}
if !deniedMetrics.Has(d1RowsWrittenMetricName) {
prometheus.MustRegister(d1RowsWritten)
}
if !deniedMetrics.Has(d1QueryBatchTimeP50MetricName) {
prometheus.MustRegister(d1QueryBatchTimeP50)
}
if !deniedMetrics.Has(d1QueryBatchTimeP90MetricName) {
prometheus.MustRegister(d1QueryBatchTimeP90)
}
if !deniedMetrics.Has(d1QueryBatchResponseBytesP50MetricName) {
prometheus.MustRegister(d1QueryBatchResponseBytesP50)
}
if !deniedMetrics.Has(d1QueryBatchResponseBytesP90MetricName) {
prometheus.MustRegister(d1QueryBatchResponseBytesP90)
}
}

Expand Down Expand Up @@ -507,6 +597,7 @@ func fetchR2StorageForAccount(account cloudflare.Account, wg *sync.WaitGroup) {
if err != nil {
return
}
//accountName := strings.ToLower(strings.ReplaceAll(account.Name, " ", "-"))
for _, acc := range r.Viewer.Accounts {
for _, bucket := range acc.R2StorageGroups {
r2Storage.With(prometheus.Labels{"account": account.Name, "bucket": bucket.Dimensions.BucketName}).Set(float64(bucket.Max.PayloadSize))
Expand All @@ -517,6 +608,62 @@ func fetchR2StorageForAccount(account cloudflare.Account, wg *sync.WaitGroup) {
}
}

func fetchD1StorageForAccount(account cloudflare.Account, wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()

r, err := fetchD1Account(account.ID)

if err != nil {
return
}
accountName := strings.ToLower(strings.ReplaceAll(account.Name, " ", "-"))

for _, acc := range r.Viewer.Accounts {
for _, database := range acc.D1StorageAdaptiveGroups {
if d1IDToName[database.Dimensions.DatabaseID] == "" && viper.GetBool("d1-use-names") {
db, getDBErr := cloudflareAPI.GetD1Database(context.Background(), cloudflare.AccountIdentifier(account.ID), database.Dimensions.DatabaseID)
if getDBErr != nil {
log.Errorf("Error fetching D1 database name: %v", getDBErr)
continue
}
d1IDToName[database.Dimensions.DatabaseID] = db.Name
}
label := ""
if viper.GetBool("d1-use-names") {
label = d1IDToName[database.Dimensions.DatabaseID]
} else {
label = database.Dimensions.DatabaseID
}
d1Storage.With(prometheus.Labels{"account": accountName, "database": label}).Set(float64(database.Max.DatabaseSizeBytes))
}
for _, database := range acc.D1AnalyticsAdaptiveGroups {
if d1IDToName[database.Dimensions.DatabaseID] == "" && viper.GetBool("d1-use-names") {
db, getDBErr := cloudflareAPI.GetD1Database(context.Background(), cloudflare.AccountIdentifier(account.ID), database.Dimensions.DatabaseID)
if getDBErr != nil {
log.Errorf("Error fetching D1 database name: %v", getDBErr)
continue
}
d1IDToName[database.Dimensions.DatabaseID] = db.Name
}
label := ""
if viper.GetBool("d1-use-names") {
label = d1IDToName[database.Dimensions.DatabaseID]
} else {
label = database.Dimensions.DatabaseID
}
d1ReadQueries.With(prometheus.Labels{"account": accountName, "database": label}).Set(float64(database.Sum.ReadQueries))
d1WriteQueries.With(prometheus.Labels{"account": accountName, "database": label}).Set(float64(database.Sum.WriteQueries))
d1RowsRead.With(prometheus.Labels{"account": accountName, "database": label}).Set(float64(database.Sum.RowsRead))
d1RowsWritten.With(prometheus.Labels{"account": accountName, "database": label}).Set(float64(database.Sum.RowsWritten))
d1QueryBatchTimeP50.With(prometheus.Labels{"account": accountName, "database": label}).Set(database.Quantiles.QueryBatchTimeMsP50)
d1QueryBatchTimeP90.With(prometheus.Labels{"account": accountName, "database": label}).Set(database.Quantiles.QueryBatchTimeMsP90)
d1QueryBatchResponseBytesP50.With(prometheus.Labels{"account": accountName, "database": label}).Set(float64(database.Quantiles.QueryBatchResponseBytesP50))
d1QueryBatchResponseBytesP90.With(prometheus.Labels{"account": accountName, "database": label}).Set(float64(database.Quantiles.QueryBatchResponseBytesP90))
}
}
}

func fetchLogpushAnalyticsForZone(zones []cloudflare.Zone, wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
Expand Down
Loading