Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cwhisper: bug fixes and refactoring #23

Merged
merged 3 commits into from
Sep 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 12 additions & 158 deletions cmd/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@ import (
"flag"
"fmt"
"log"
"math"
"os"
"strings"
"sync"
"time"

whisper "github.com/go-graphite/go-whisper"
)
Expand All @@ -33,164 +29,22 @@ func main() {
os.Exit(1)
}

var quarantines [][2]int
if *quarantinesRaw != "" {
for _, q := range strings.Split(*quarantinesRaw, ";") {
var quarantine [2]int
for i, t := range strings.Split(q, ",") {
tim, err := time.Parse("2006-01-02", t)
if err != nil {
panic(err)
}
quarantine[i] = int(tim.Unix())
}
quarantines = append(quarantines, quarantine)
}
}

if *now > 0 {
whisper.Now = func() time.Time {
return time.Unix(int64(*now), 0)
}
}

file1 := flag.Args()[0]
file2 := flag.Args()[1]
oflag := os.O_RDONLY

db1, err := whisper.OpenWithOptions(file1, &whisper.Options{OpenFileFlag: &oflag})
if err != nil {
panic(err)
msg, err := whisper.Compare(
file1, file2,
*now,
*ignoreBuffer,
*quarantinesRaw,
*verbose,
*strict,
*muteThreshold,
)
if len(msg) > 0 {
fmt.Print(msg)
}
db2, err := whisper.OpenWithOptions(file2, &whisper.Options{OpenFileFlag: &oflag})
if err != nil {
panic(err)
}

var bad bool
for index, ret := range db1.Retentions() {
from := int(whisper.Now().Unix()) - ret.MaxRetention() + ret.SecondsPerPoint()*60
until := int(whisper.Now().Unix()) - 3600*8

if *verbose {
fmt.Printf("%d %s: from = %+v until = %+v\n", index, ret, from, until)
}

var dps1, dps2 *whisper.TimeSeries
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

var err error
dps1, err = db1.Fetch(from, until)
if err != nil {
panic(err)
}
}()

wg.Add(1)
go func() {
defer wg.Done()

var err error
dps2, err = db2.Fetch(from, until)
if err != nil {
panic(err)
}
}()

wg.Wait()

if *ignoreBuffer {
{
vals := dps1.Values()
vals[len(vals)-1] = math.NaN()
vals[len(vals)-2] = math.NaN()
}
{
vals := dps2.Values()
vals[len(vals)-1] = math.NaN()
vals[len(vals)-2] = math.NaN()
}
}

for _, quarantine := range quarantines {
qfrom := quarantine[0]
quntil := quarantine[1]
if from <= qfrom && qfrom <= until {
qfromIndex := (qfrom - from) / ret.SecondsPerPoint()
quntilIndex := (quntil - from) / ret.SecondsPerPoint()
{
vals := dps1.Values()
for i := qfromIndex; i <= quntilIndex && i < len(vals); i++ {
vals[i] = math.NaN()
}
}
{
vals := dps2.Values()
for i := qfromIndex; i <= quntilIndex && i < len(vals); i++ {
vals[i] = math.NaN()
}
}
}
}

var vals1, vals2 int
for _, p := range dps1.Values() {
if !math.IsNaN(p) {
vals1++
}
}
for _, p := range dps2.Values() {
if !math.IsNaN(p) {
vals2++
}
}

fmt.Printf(" len1 = %d len2 = %d vals1 = %d vals2 = %d\n", len(dps1.Values()), len(dps2.Values()), vals1, vals2)

if len(dps1.Values()) != len(dps2.Values()) {
bad = true
fmt.Printf(" size doesn't match: %d != %d\n", len(dps1.Values()), len(dps2.Values()))
}
if vals1 != vals2 {
bad = true
fmt.Printf(" values doesn't match: %d != %d (%d)\n", vals1, vals2, vals1-vals2)
}
var ptDiff int
for i, p1 := range dps1.Values() {
if len(dps2.Values()) < i {
break
}
p2 := dps2.Values()[i]
if !((math.IsNaN(p1) && math.IsNaN(p2)) || p1 == p2) {
bad = true
ptDiff++
if *verbose {
fmt.Printf(" %d: %d %v != %v\n", i, dps1.FromTime()+i*ret.SecondsPerPoint(), p1, p2)
}
}
}
fmt.Printf(" point mismatches: %d\n", ptDiff)
if ptDiff <= *muteThreshold && !*strict {
bad = false
}
}
if db1.IsCompressed() {
if err := db1.CheckIntegrity(); err != nil {
fmt.Printf("integrity: %s\n%s", file1, err)
bad = true
}
}
if db2.IsCompressed() {
if err := db2.CheckIntegrity(); err != nil {
fmt.Printf("integrity: %s\n%s", file2, err)
bad = true
}
}

if bad {
fmt.Print(err)
os.Exit(1)
}
}
16 changes: 10 additions & 6 deletions cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package main

import (
"flag"
"fmt"
"io/ioutil"
"os"
"os/exec"
Expand All @@ -17,21 +18,24 @@ func main() {
noLess := flag.Bool("no-less", false, "Don't use less, print everything to stdout.")
flag.Parse()

oflag := os.O_RDONLY
db, err := whisper.OpenWithOptions(flag.Args()[0], &whisper.Options{OpenFileFlag: &oflag})
if err != nil {
fmt.Println(err.Error())
os.Exit(1)
}

less := exec.Command("less")
if !*noLess {
less.Stdout = os.Stdout
temp, err := ioutil.TempFile("", "")
if err != nil {
panic(err)
fmt.Println(err.Error())
os.Exit(1)
}
os.Stdout = temp
}

oflag := os.O_RDONLY
db, err := whisper.OpenWithOptions(flag.Args()[0], &whisper.Options{OpenFileFlag: &oflag})
if err != nil {
panic(err)
}
db.Dump(!*header, *debug)

if !*noLess {
Expand Down
84 changes: 75 additions & 9 deletions cmd/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,34 @@
package main

import (
"flag"
"fmt"
"io/ioutil"
"math/rand"
"os"
"strconv"
"strings"
"time"

whisper "github.com/go-graphite/go-whisper"
)

func main() {
ignoreNow := flag.Bool("ignore-now", false, "ignore now on write (always write to the base/first archive)")
schema := flag.String("schema", "", "create a new whisper file using the schema if file not found: 1s2d:1m:31d:1h:10y;avg")
xFilesFactor := flag.Float64("xfiles-factor", 0.0, "xfiles factor used for creating new whisper file")
delimiter := flag.String("d", ",", "delimiter of data points")
compressed := flag.Bool("compressed", false, "use compressed format")
randChunk := flag.Int("rand-chunk", 0, "randomize input size with limit for triggering extensions and simulating real life writes.")
ppb := flag.Int("ppb", whisper.DefaultPointsPerBlock, "points per block")
flag.Parse()

var body string
if len(os.Args) < 2 {
if len(flag.Args()) < 1 {
fmt.Println("write: write data points to a whisper file.\nwrite file.wsp [1572940800:3,1572940801:5]\n")
os.Exit(1)
} else if len(os.Args) > 2 {
body = os.Args[2]
} else if len(flag.Args()) > 1 {
body = flag.Args()[1]
} else {
in, err := ioutil.ReadAll(os.Stdin)
if err != nil {
Expand All @@ -27,13 +39,67 @@ func main() {
body = string(in)
}

db, err := whisper.OpenWithOptions(os.Args[1], &whisper.Options{FLock: true})
filename := flag.Args()[0]
db, err := whisper.OpenWithOptions(filename, &whisper.Options{FLock: true, IgnoreNowOnWrite: *ignoreNow})
if err != nil {
panic(err)
if !os.IsNotExist(err) {
fmt.Printf("failed to open file: %s\n", err)
os.Exit(1)
}
if *schema == "" {
fmt.Println("file not found")
os.Exit(1)
}

specs := strings.Split(*schema, ";")
if len(specs) != 2 {
fmt.Printf("illegal schema: %s example: retention;aggregation\n", *schema)
os.Exit(1)
}
rets, err := whisper.ParseRetentionDefs(specs[0])
if err != nil {
fmt.Printf("failed to parse retentions: %s\n", err)
os.Exit(1)
}
aggregationMethod := whisper.ParseAggregationMethod(specs[1])
if aggregationMethod == whisper.Unknown {
fmt.Printf("unknow aggregation method: %s\n", specs[1])
os.Exit(1)
}

db, err = whisper.CreateWithOptions(
filename, rets, aggregationMethod, float32(*xFilesFactor),
&whisper.Options{
Compressed: *compressed,
IgnoreNowOnWrite: *ignoreNow,
PointsPerBlock: *ppb,
},
)
if err != nil {
fmt.Printf("failed to create new whisper file: %s\n", err)
os.Exit(1)
}
}

if err := db.UpdateMany(parse(body)); err != nil {
panic(err)
rand.Seed(time.Now().Unix())

dps := parse(body, *delimiter)
if *randChunk > 0 {
for i := 0; i < len(dps); {
// end := i + rand.Intn(*randChunk) + 1
end := i + *randChunk + 1
if end > len(dps) {
end = len(dps)
}
if err := db.UpdateMany(dps[i:end]); err != nil {
panic(err)
}
i = end
}
} else {
if err := db.UpdateMany(dps); err != nil {
panic(err)
}
}

if err := db.Close(); err != nil {
Expand All @@ -45,9 +111,9 @@ func main() {
}
}

func parse(str string) []*whisper.TimeSeriesPoint {
func parse(str, delimiter string) []*whisper.TimeSeriesPoint {
var ps []*whisper.TimeSeriesPoint
for _, p := range strings.Split(str, ",") {
for _, p := range strings.Split(str, delimiter) {
p = strings.TrimSpace(p)
if p == "" {
continue
Expand Down
Loading