Skip to content

Commit

Permalink
cwhisper: bug fixes and refactoring
Browse files Browse the repository at this point in the history
* refactor cmd/compare.go
* refactor cmd/dump.go
* refactor tests
* fix ooo write for single retention cwhisper files #20
  • Loading branch information
bom-d-van committed Sep 6, 2020
1 parent 2157ac5 commit aec9979
Show file tree
Hide file tree
Showing 7 changed files with 649 additions and 387 deletions.
170 changes: 12 additions & 158 deletions cmd/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@ import (
"flag"
"fmt"
"log"
"math"
"os"
"strings"
"sync"
"time"

whisper "github.com/go-graphite/go-whisper"
)
Expand All @@ -33,164 +29,22 @@ func main() {
os.Exit(1)
}

var quarantines [][2]int
if *quarantinesRaw != "" {
for _, q := range strings.Split(*quarantinesRaw, ";") {
var quarantine [2]int
for i, t := range strings.Split(q, ",") {
tim, err := time.Parse("2006-01-02", t)
if err != nil {
panic(err)
}
quarantine[i] = int(tim.Unix())
}
quarantines = append(quarantines, quarantine)
}
}

if *now > 0 {
whisper.Now = func() time.Time {
return time.Unix(int64(*now), 0)
}
}

file1 := flag.Args()[0]
file2 := flag.Args()[1]
oflag := os.O_RDONLY

db1, err := whisper.OpenWithOptions(file1, &whisper.Options{OpenFileFlag: &oflag})
if err != nil {
panic(err)
msg, err := whisper.Compare(
file1, file2,
*now,
*ignoreBuffer,
*quarantinesRaw,
*verbose,
*strict,
*muteThreshold,
)
if len(msg) > 0 {
fmt.Print(msg)
}
db2, err := whisper.OpenWithOptions(file2, &whisper.Options{OpenFileFlag: &oflag})
if err != nil {
panic(err)
}

var bad bool
for index, ret := range db1.Retentions() {
from := int(whisper.Now().Unix()) - ret.MaxRetention() + ret.SecondsPerPoint()*60
until := int(whisper.Now().Unix()) - 3600*8

if *verbose {
fmt.Printf("%d %s: from = %+v until = %+v\n", index, ret, from, until)
}

var dps1, dps2 *whisper.TimeSeries
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

var err error
dps1, err = db1.Fetch(from, until)
if err != nil {
panic(err)
}
}()

wg.Add(1)
go func() {
defer wg.Done()

var err error
dps2, err = db2.Fetch(from, until)
if err != nil {
panic(err)
}
}()

wg.Wait()

if *ignoreBuffer {
{
vals := dps1.Values()
vals[len(vals)-1] = math.NaN()
vals[len(vals)-2] = math.NaN()
}
{
vals := dps2.Values()
vals[len(vals)-1] = math.NaN()
vals[len(vals)-2] = math.NaN()
}
}

for _, quarantine := range quarantines {
qfrom := quarantine[0]
quntil := quarantine[1]
if from <= qfrom && qfrom <= until {
qfromIndex := (qfrom - from) / ret.SecondsPerPoint()
quntilIndex := (quntil - from) / ret.SecondsPerPoint()
{
vals := dps1.Values()
for i := qfromIndex; i <= quntilIndex && i < len(vals); i++ {
vals[i] = math.NaN()
}
}
{
vals := dps2.Values()
for i := qfromIndex; i <= quntilIndex && i < len(vals); i++ {
vals[i] = math.NaN()
}
}
}
}

var vals1, vals2 int
for _, p := range dps1.Values() {
if !math.IsNaN(p) {
vals1++
}
}
for _, p := range dps2.Values() {
if !math.IsNaN(p) {
vals2++
}
}

fmt.Printf(" len1 = %d len2 = %d vals1 = %d vals2 = %d\n", len(dps1.Values()), len(dps2.Values()), vals1, vals2)

if len(dps1.Values()) != len(dps2.Values()) {
bad = true
fmt.Printf(" size doesn't match: %d != %d\n", len(dps1.Values()), len(dps2.Values()))
}
if vals1 != vals2 {
bad = true
fmt.Printf(" values doesn't match: %d != %d (%d)\n", vals1, vals2, vals1-vals2)
}
var ptDiff int
for i, p1 := range dps1.Values() {
if len(dps2.Values()) < i {
break
}
p2 := dps2.Values()[i]
if !((math.IsNaN(p1) && math.IsNaN(p2)) || p1 == p2) {
bad = true
ptDiff++
if *verbose {
fmt.Printf(" %d: %d %v != %v\n", i, dps1.FromTime()+i*ret.SecondsPerPoint(), p1, p2)
}
}
}
fmt.Printf(" point mismatches: %d\n", ptDiff)
if ptDiff <= *muteThreshold && !*strict {
bad = false
}
}
if db1.IsCompressed() {
if err := db1.CheckIntegrity(); err != nil {
fmt.Printf("integrity: %s\n%s", file1, err)
bad = true
}
}
if db2.IsCompressed() {
if err := db2.CheckIntegrity(); err != nil {
fmt.Printf("integrity: %s\n%s", file2, err)
bad = true
}
}

if bad {
fmt.Print(err)
os.Exit(1)
}
}
16 changes: 10 additions & 6 deletions cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package main

import (
"flag"
"fmt"
"io/ioutil"
"os"
"os/exec"
Expand All @@ -17,21 +18,24 @@ func main() {
noLess := flag.Bool("no-less", false, "Don't use less, print everything to stdout.")
flag.Parse()

oflag := os.O_RDONLY
db, err := whisper.OpenWithOptions(flag.Args()[0], &whisper.Options{OpenFileFlag: &oflag})
if err != nil {
fmt.Println(err.Error())
os.Exit(1)
}

less := exec.Command("less")
if !*noLess {
less.Stdout = os.Stdout
temp, err := ioutil.TempFile("", "")
if err != nil {
panic(err)
fmt.Println(err.Error())
os.Exit(1)
}
os.Stdout = temp
}

oflag := os.O_RDONLY
db, err := whisper.OpenWithOptions(flag.Args()[0], &whisper.Options{OpenFileFlag: &oflag})
if err != nil {
panic(err)
}
db.Dump(!*header, *debug)

if !*noLess {
Expand Down
84 changes: 75 additions & 9 deletions cmd/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,34 @@
package main

import (
"flag"
"fmt"
"io/ioutil"
"math/rand"
"os"
"strconv"
"strings"
"time"

whisper "github.com/go-graphite/go-whisper"
)

func main() {
ignoreNow := flag.Bool("ignore-now", false, "ignore now on write (always write to the base/first archive)")
schema := flag.String("schema", "", "create a new whisper file using the schema if file not found: 1s2d:1m:31d:1h:10y;avg")
xFilesFactor := flag.Float64("xfiles-factor", 0.0, "xfiles factor used for creating new whisper file")
delimiter := flag.String("d", ",", "delimiter of data points")
compressed := flag.Bool("compressed", false, "use compressed format")
randChunk := flag.Int("rand-chunk", 0, "randomize input size with limit for triggering extensions and simulating real life writes.")
ppb := flag.Int("ppb", whisper.DefaultPointsPerBlock, "points per block")
flag.Parse()

var body string
if len(os.Args) < 2 {
if len(flag.Args()) < 1 {
fmt.Println("write: write data points to a whisper file.\nwrite file.wsp [1572940800:3,1572940801:5]\n")
os.Exit(1)
} else if len(os.Args) > 2 {
body = os.Args[2]
} else if len(flag.Args()) > 1 {
body = flag.Args()[1]
} else {
in, err := ioutil.ReadAll(os.Stdin)
if err != nil {
Expand All @@ -27,13 +39,67 @@ func main() {
body = string(in)
}

db, err := whisper.OpenWithOptions(os.Args[1], &whisper.Options{FLock: true})
filename := flag.Args()[0]
db, err := whisper.OpenWithOptions(filename, &whisper.Options{FLock: true, IgnoreNowOnWrite: *ignoreNow})
if err != nil {
panic(err)
if !os.IsNotExist(err) {
fmt.Printf("failed to open file: %s\n", err)
os.Exit(1)
}
if *schema == "" {
fmt.Println("file not found")
os.Exit(1)
}

specs := strings.Split(*schema, ";")
if len(specs) != 2 {
fmt.Printf("illegal schema: %s example: retention;aggregation\n", *schema)
os.Exit(1)
}
rets, err := whisper.ParseRetentionDefs(specs[0])
if err != nil {
fmt.Printf("failed to parse retentions: %s\n", err)
os.Exit(1)
}
aggregationMethod := whisper.ParseAggregationMethod(specs[1])
if aggregationMethod == whisper.Unknown {
fmt.Printf("unknow aggregation method: %s\n", specs[1])
os.Exit(1)
}

db, err = whisper.CreateWithOptions(
filename, rets, aggregationMethod, float32(*xFilesFactor),
&whisper.Options{
Compressed: *compressed,
IgnoreNowOnWrite: *ignoreNow,
PointsPerBlock: *ppb,
},
)
if err != nil {
fmt.Printf("failed to create new whisper file: %s\n", err)
os.Exit(1)
}
}

if err := db.UpdateMany(parse(body)); err != nil {
panic(err)
rand.Seed(time.Now().Unix())

dps := parse(body, *delimiter)
if *randChunk > 0 {
for i := 0; i < len(dps); {
// end := i + rand.Intn(*randChunk) + 1
end := i + *randChunk + 1
if end > len(dps) {
end = len(dps)
}
if err := db.UpdateMany(dps[i:end]); err != nil {
panic(err)
}
i = end
}
} else {
if err := db.UpdateMany(dps); err != nil {
panic(err)
}
}

if err := db.Close(); err != nil {
Expand All @@ -45,9 +111,9 @@ func main() {
}
}

func parse(str string) []*whisper.TimeSeriesPoint {
func parse(str, delimiter string) []*whisper.TimeSeriesPoint {
var ps []*whisper.TimeSeriesPoint
for _, p := range strings.Split(str, ",") {
for _, p := range strings.Split(str, delimiter) {
p = strings.TrimSpace(p)
if p == "" {
continue
Expand Down
Loading

0 comments on commit aec9979

Please sign in to comment.