From aec997997938d892ff8d6df9fb792b30d7d888cf Mon Sep 17 00:00:00 2001 From: bom-d-van Date: Sun, 19 Jul 2020 14:28:28 +0200 Subject: [PATCH] cwhisper: bug fixes and refactoring * refactor cmd/compare.go * refactor cmd/dump.go * refactor tests * fix ooo write for single retention cwhisper files #20 --- cmd/compare.go | 170 ++---------------------- cmd/dump.go | 16 ++- cmd/write.go | 84 ++++++++++-- compress.go | 333 +++++++++++++++++++++++++---------------------- compress_test.go | 190 ++++++++++++++++++++------- debug.go | 178 ++++++++++++++++++++++++- whisper.go | 65 +++++++-- 7 files changed, 649 insertions(+), 387 deletions(-) diff --git a/cmd/compare.go b/cmd/compare.go index 7467def..0ebf0ca 100755 --- a/cmd/compare.go +++ b/cmd/compare.go @@ -6,11 +6,7 @@ import ( "flag" "fmt" "log" - "math" "os" - "strings" - "sync" - "time" whisper "github.com/go-graphite/go-whisper" ) @@ -33,164 +29,22 @@ func main() { os.Exit(1) } - var quarantines [][2]int - if *quarantinesRaw != "" { - for _, q := range strings.Split(*quarantinesRaw, ";") { - var quarantine [2]int - for i, t := range strings.Split(q, ",") { - tim, err := time.Parse("2006-01-02", t) - if err != nil { - panic(err) - } - quarantine[i] = int(tim.Unix()) - } - quarantines = append(quarantines, quarantine) - } - } - - if *now > 0 { - whisper.Now = func() time.Time { - return time.Unix(int64(*now), 0) - } - } - file1 := flag.Args()[0] file2 := flag.Args()[1] - oflag := os.O_RDONLY - - db1, err := whisper.OpenWithOptions(file1, &whisper.Options{OpenFileFlag: &oflag}) - if err != nil { - panic(err) + msg, err := whisper.Compare( + file1, file2, + *now, + *ignoreBuffer, + *quarantinesRaw, + *verbose, + *strict, + *muteThreshold, + ) + if len(msg) > 0 { + fmt.Print(msg) } - db2, err := whisper.OpenWithOptions(file2, &whisper.Options{OpenFileFlag: &oflag}) if err != nil { - panic(err) - } - - var bad bool - for index, ret := range db1.Retentions() { - from := int(whisper.Now().Unix()) - ret.MaxRetention() + ret.SecondsPerPoint()*60 - until := int(whisper.Now().Unix()) - 3600*8 - - if *verbose { - fmt.Printf("%d %s: from = %+v until = %+v\n", index, ret, from, until) - } - - var dps1, dps2 *whisper.TimeSeries - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - - var err error - dps1, err = db1.Fetch(from, until) - if err != nil { - panic(err) - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - - var err error - dps2, err = db2.Fetch(from, until) - if err != nil { - panic(err) - } - }() - - wg.Wait() - - if *ignoreBuffer { - { - vals := dps1.Values() - vals[len(vals)-1] = math.NaN() - vals[len(vals)-2] = math.NaN() - } - { - vals := dps2.Values() - vals[len(vals)-1] = math.NaN() - vals[len(vals)-2] = math.NaN() - } - } - - for _, quarantine := range quarantines { - qfrom := quarantine[0] - quntil := quarantine[1] - if from <= qfrom && qfrom <= until { - qfromIndex := (qfrom - from) / ret.SecondsPerPoint() - quntilIndex := (quntil - from) / ret.SecondsPerPoint() - { - vals := dps1.Values() - for i := qfromIndex; i <= quntilIndex && i < len(vals); i++ { - vals[i] = math.NaN() - } - } - { - vals := dps2.Values() - for i := qfromIndex; i <= quntilIndex && i < len(vals); i++ { - vals[i] = math.NaN() - } - } - } - } - - var vals1, vals2 int - for _, p := range dps1.Values() { - if !math.IsNaN(p) { - vals1++ - } - } - for _, p := range dps2.Values() { - if !math.IsNaN(p) { - vals2++ - } - } - - fmt.Printf(" len1 = %d len2 = %d vals1 = %d vals2 = %d\n", len(dps1.Values()), len(dps2.Values()), vals1, vals2) - - if len(dps1.Values()) != len(dps2.Values()) { - bad = true - fmt.Printf(" size doesn't match: %d != %d\n", len(dps1.Values()), len(dps2.Values())) - } - if vals1 != vals2 { - bad = true - fmt.Printf(" values doesn't match: %d != %d (%d)\n", vals1, vals2, vals1-vals2) - } - var ptDiff int - for i, p1 := range dps1.Values() { - if len(dps2.Values()) < i { - break - } - p2 := dps2.Values()[i] - if !((math.IsNaN(p1) && math.IsNaN(p2)) || p1 == p2) { - bad = true - ptDiff++ - if *verbose { - fmt.Printf(" %d: %d %v != %v\n", i, dps1.FromTime()+i*ret.SecondsPerPoint(), p1, p2) - } - } - } - fmt.Printf(" point mismatches: %d\n", ptDiff) - if ptDiff <= *muteThreshold && !*strict { - bad = false - } - } - if db1.IsCompressed() { - if err := db1.CheckIntegrity(); err != nil { - fmt.Printf("integrity: %s\n%s", file1, err) - bad = true - } - } - if db2.IsCompressed() { - if err := db2.CheckIntegrity(); err != nil { - fmt.Printf("integrity: %s\n%s", file2, err) - bad = true - } - } - - if bad { + fmt.Print(err) os.Exit(1) } } diff --git a/cmd/dump.go b/cmd/dump.go index 7cb2915..db6194d 100644 --- a/cmd/dump.go +++ b/cmd/dump.go @@ -4,6 +4,7 @@ package main import ( "flag" + "fmt" "io/ioutil" "os" "os/exec" @@ -17,21 +18,24 @@ func main() { noLess := flag.Bool("no-less", false, "Don't use less, print everything to stdout.") flag.Parse() + oflag := os.O_RDONLY + db, err := whisper.OpenWithOptions(flag.Args()[0], &whisper.Options{OpenFileFlag: &oflag}) + if err != nil { + fmt.Println(err.Error()) + os.Exit(1) + } + less := exec.Command("less") if !*noLess { less.Stdout = os.Stdout temp, err := ioutil.TempFile("", "") if err != nil { - panic(err) + fmt.Println(err.Error()) + os.Exit(1) } os.Stdout = temp } - oflag := os.O_RDONLY - db, err := whisper.OpenWithOptions(flag.Args()[0], &whisper.Options{OpenFileFlag: &oflag}) - if err != nil { - panic(err) - } db.Dump(!*header, *debug) if !*noLess { diff --git a/cmd/write.go b/cmd/write.go index 8c74ff0..f09e37d 100644 --- a/cmd/write.go +++ b/cmd/write.go @@ -3,22 +3,34 @@ package main import ( + "flag" "fmt" "io/ioutil" + "math/rand" "os" "strconv" "strings" + "time" whisper "github.com/go-graphite/go-whisper" ) func main() { + ignoreNow := flag.Bool("ignore-now", false, "ignore now on write (always write to the base/first archive)") + schema := flag.String("schema", "", "create a new whisper file using the schema if file not found: 1s2d:1m:31d:1h:10y;avg") + xFilesFactor := flag.Float64("xfiles-factor", 0.0, "xfiles factor used for creating new whisper file") + delimiter := flag.String("d", ",", "delimiter of data points") + compressed := flag.Bool("compressed", false, "use compressed format") + randChunk := flag.Int("rand-chunk", 0, "randomize input size with limit for triggering extensions and simulating real life writes.") + ppb := flag.Int("ppb", whisper.DefaultPointsPerBlock, "points per block") + flag.Parse() + var body string - if len(os.Args) < 2 { + if len(flag.Args()) < 1 { fmt.Println("write: write data points to a whisper file.\nwrite file.wsp [1572940800:3,1572940801:5]\n") os.Exit(1) - } else if len(os.Args) > 2 { - body = os.Args[2] + } else if len(flag.Args()) > 1 { + body = flag.Args()[1] } else { in, err := ioutil.ReadAll(os.Stdin) if err != nil { @@ -27,13 +39,67 @@ func main() { body = string(in) } - db, err := whisper.OpenWithOptions(os.Args[1], &whisper.Options{FLock: true}) + filename := flag.Args()[0] + db, err := whisper.OpenWithOptions(filename, &whisper.Options{FLock: true, IgnoreNowOnWrite: *ignoreNow}) if err != nil { - panic(err) + if !os.IsNotExist(err) { + fmt.Printf("failed to open file: %s\n", err) + os.Exit(1) + } + if *schema == "" { + fmt.Println("file not found") + os.Exit(1) + } + + specs := strings.Split(*schema, ";") + if len(specs) != 2 { + fmt.Printf("illegal schema: %s example: retention;aggregation\n", *schema) + os.Exit(1) + } + rets, err := whisper.ParseRetentionDefs(specs[0]) + if err != nil { + fmt.Printf("failed to parse retentions: %s\n", err) + os.Exit(1) + } + aggregationMethod := whisper.ParseAggregationMethod(specs[1]) + if aggregationMethod == whisper.Unknown { + fmt.Printf("unknow aggregation method: %s\n", specs[1]) + os.Exit(1) + } + + db, err = whisper.CreateWithOptions( + filename, rets, aggregationMethod, float32(*xFilesFactor), + &whisper.Options{ + Compressed: *compressed, + IgnoreNowOnWrite: *ignoreNow, + PointsPerBlock: *ppb, + }, + ) + if err != nil { + fmt.Printf("failed to create new whisper file: %s\n", err) + os.Exit(1) + } } - if err := db.UpdateMany(parse(body)); err != nil { - panic(err) + rand.Seed(time.Now().Unix()) + + dps := parse(body, *delimiter) + if *randChunk > 0 { + for i := 0; i < len(dps); { + // end := i + rand.Intn(*randChunk) + 1 + end := i + *randChunk + 1 + if end > len(dps) { + end = len(dps) + } + if err := db.UpdateMany(dps[i:end]); err != nil { + panic(err) + } + i = end + } + } else { + if err := db.UpdateMany(dps); err != nil { + panic(err) + } } if err := db.Close(); err != nil { @@ -45,9 +111,9 @@ func main() { } } -func parse(str string) []*whisper.TimeSeriesPoint { +func parse(str, delimiter string) []*whisper.TimeSeriesPoint { var ps []*whisper.TimeSeriesPoint - for _, p := range strings.Split(str, ",") { + for _, p := range strings.Split(str, delimiter) { p = strings.TrimSpace(p) if p == "" { continue diff --git a/compress.go b/compress.go index 5c1ee6a..575bf59 100644 --- a/compress.go +++ b/compress.go @@ -317,12 +317,12 @@ func (archive *archiveInfo) getSortedBlockRanges() []blockRange { return brs } -func (archive *archiveInfo) getOverallRange() (from, until int) { +func (archive *archiveInfo) getRange() (from, until int) { for _, b := range archive.blockRanges { if from == 0 || from > b.start { from = b.start } - if b.end > until { + if until == 0 || b.end > until { until = b.end } } @@ -332,7 +332,7 @@ func (archive *archiveInfo) getOverallRange() (from, until int) { func (archive *archiveInfo) hasBuffer() bool { return archive.bufferSize > 0 } func (whisper *Whisper) fetchCompressed(start, end int64, archive *archiveInfo) ([]dataPoint, error) { - var dst []dataPoint + var dst []dataPoint // TODO: optimize this with pre-allocation var buf = make([]byte, archive.blockSize) for _, block := range archive.getSortedBlockRanges() { if block.end >= int(start) && int(end) >= block.start { @@ -361,37 +361,39 @@ func (whisper *Whisper) fetchCompressed(start, end int64, archive *archiveInfo) } } - // Start live aggregation. This probably has a read peformance hit. - if base := whisper.archives[0]; base != archive { - var dps []dataPoint + base := whisper.archives[0] + if base == archive { + return dst, nil + } + // Start live aggregation. This probably has a read peformance hit. + if whisper.aggregationMethod == Mix { // Mix aggregation is triggered when block in base archive is rotated and also // depends on the sufficiency of data points. This could results to a over // long gap when fetching data from higer archives, depending on different // retention policy. Therefore cwhisper needs to do live aggregation. - if whisper.aggregationMethod == Mix { - var baseLookupNeeded = len(dst) == 0 || dst[len(dst)-1].interval < int(end) - var inBase bool - if baseLookupNeeded { - bstart, bend := base.getOverallRange() - inBase = int64(bstart) <= end || end <= int64(bend) - } - if inBase { - nstart := start - if len(dst) > 0 { - // TODO: invest why shifting the last data point interval is wrong - nstart = int64(archive.Interval(dst[len(dst)-1].interval)) // + archive.secondsPerPoint - } - var err error - dps, err = whisper.fetchCompressed(nstart, end, base) - if err != nil { - return dst, err - } + var dps []dataPoint + var inBase bool + var baseLookupNeeded = len(dst) == 0 || dst[len(dst)-1].interval < int(end) + if baseLookupNeeded { + bstart, bend := base.getRange() + inBase = int64(bstart) <= end || end <= int64(bend) + } + + if inBase { + nstart := start + if len(dst) > 0 { + // TODO: invest why shifting the last data point interval is wrong + nstart = int64(archive.Interval(dst[len(dst)-1].interval)) // + archive.secondsPerPoint + } + var err error + dps, err = whisper.fetchCompressed(nstart, end, base) + if err != nil { + return dst, err } } - // This would benefits both mix and no-mix aggregations. if base.hasBuffer() { for _, p := range unpackDataPoints(base.buffer) { if p.interval != 0 && int(start) <= p.interval && p.interval <= int(end) { @@ -400,41 +402,47 @@ func (whisper *Whisper) fetchCompressed(start, end int64, archive *archiveInfo) } } - var pinterval int - var vals []float64 - for i, dp := range dps { - // same as archiveInfo.AggregateInterval - interval := dp.interval - mod(dp.interval, archive.secondsPerPoint) - if pinterval == 0 || pinterval == interval { - pinterval = interval - vals = append(vals, dp.value) + adps := whisper.aggregateByArchives(dps) + dst = append(dst, adps[archive]...) + } else { + // retrieve data points within range from the higher/previous archives + var dps []dataPoint + for i, arc := range whisper.archives { + if arc == archive || i == len(whisper.archives)-1 { + break + } - if i < len(dps)-1 { + cvals := []float64{} + cinterval := 0 + tdps := append(dps, unpackDataPoints(arc.buffer)...) + dps = []dataPoint{} + for j, p := range tdps { + if p.interval == 0 && j < len(tdps)-1 { continue } - } + interval := arc.AggregateInterval(p.interval) + if cinterval == 0 || cinterval == interval { + cinterval = interval + cvals = append(cvals, p.value) - // check we have enough data points to propagate a value - knownPercent := float32(len(vals)) / float32(archive.secondsPerPoint/base.secondsPerPoint) - if len(vals) > 0 && knownPercent >= whisper.xFilesFactor { - var ndp dataPoint - ndp.interval = pinterval - if whisper.aggregationMethod == Mix { - if archive.aggregationSpec.Method == Percentile { - ndp.value = aggregatePercentile(archive.aggregationSpec.Percentile, vals) - } else { - ndp.value = aggregate(archive.aggregationSpec.Method, vals) - } - } else { - ndp.value = aggregate(whisper.aggregationMethod, vals) + continue } - dst = append(dst, ndp) - } - vals = vals[:0] - vals = append(vals, dp.value) - pinterval = interval + dps = append(dps, dataPoint{cinterval, aggregate(whisper.aggregationMethod, cvals)}) + + cinterval = interval + cvals = []float64{p.value} + } } + sort.SliceStable(dps, func(i, j int) bool { return dps[i].interval < dps[j].interval }) + for i := 0; i < len(dps); i++ { + if int(start) <= dps[i].interval && dps[i].interval <= int(end) { + continue + } + dps = dps[:i] + break + } + dst = append(dst, dps...) } return dst, nil @@ -534,9 +542,8 @@ func (whisper *Whisper) archiveUpdateManyCompressed(archive *archiveInfo, points } func (archive *archiveInfo) getBufferInfo() (units []int, index, min int) { - unitCount := len(archive.buffer) / PointSize / (archive.next.secondsPerPoint / archive.secondsPerPoint) var max int - for i := 0; i < unitCount; i++ { + for i := 0; i < archive.bufferUnitCount(); i++ { v := getFirstDataPointStrict(archive.getBufferByUnit(i)).interval if v > 0 { v = archive.AggregateInterval(v) @@ -554,6 +561,10 @@ func (archive *archiveInfo) getBufferInfo() (units []int, index, min int) { return } +func (archive *archiveInfo) bufferUnitCount() int { + return len(archive.buffer) / PointSize / (archive.next.secondsPerPoint / archive.secondsPerPoint) +} + func (archive *archiveInfo) getBufferByUnit(unit int) []byte { count := archive.next.secondsPerPoint / archive.secondsPerPoint lb := unit * PointSize * count @@ -564,6 +575,7 @@ func (archive *archiveInfo) getBufferByUnit(unit int) []byte { func (archive *archiveInfo) appendToBlockAndRotate(dps []dataPoint) (rotated bool, err error) { whisper := archive.whisper // TODO: optimize away? + // TODO: to improve? blockBuffer := make([]byte, len(dps)*(MaxCompressedPointSize)+endOfBlockSize) for { @@ -859,6 +871,9 @@ func (a *archiveInfo) AppendPointsToBlock(buf []byte, ps []dataPoint) (written i for i, p := range ps { if p.interval == 0 { continue + } else if p.interval < a.cblock.pn1.interval { + a.stats.discard.oldInterval++ + continue } oldBwIndex := bw.index @@ -1680,167 +1695,167 @@ func (dstw *Whisper) FillCompressed(srcw *Whisper) error { } func (whisper *Whisper) propagateToMixedArchivesCompressed() error { - var largestSPP int - var lastArchive *archiveInfo - var spps []int - for _, arc := range whisper.archives[1:] { - if arc.secondsPerPoint > largestSPP { - largestSPP = arc.secondsPerPoint - lastArchive = arc - } - - var knownSPP bool - for _, spp := range spps { - knownSPP = knownSPP || (arc.secondsPerPoint == spp) - } - if !knownSPP { - spps = append(spps, arc.secondsPerPoint) - } - } + var lastArchive = whisper.archives[len(whisper.archives)-1] + var largestSPP = lastArchive.secondsPerPoint if largestSPP == 0 { return nil } - var baseArchive = whisper.archives[0] - var sortedBaseArcBrs = baseArchive.getSortedBlockRanges() - var until = baseArchive.cblock.pn1.interval - if until == 0 { - for _, br := range sortedBaseArcBrs { - if br.end == 0 { - break - } - - until = br.end - } - - if until == 0 { - return nil - } - } - // always exclude the last data point to make sure it's not a pre-mature propagation. - until = lastArchive.Interval(until) - 1 - if until <= 0 { - return nil - } + var firstArchive = whisper.archives[0] + var firstStart, firstEnd = firstArchive.getRange() + var _, lastEnd = lastArchive.getRange() var from int - if lastArchive.cblock.pn1.interval == 0 { - if sortedBaseArcBrs[0].start == 0 { - return nil - } - for _, br := range lastArchive.getSortedBlockRanges() { - if br.end == 0 { - break - } - from = br.end - } - - if from == 0 { - from = sortedBaseArcBrs[0].start - } + if lastEnd > 0 { + from = lastEnd } else { - from = lastArchive.cblock.pn1.interval + lastArchive.secondsPerPoint + from = firstStart } - // only propagate when there are enough data points for all the lower - // archives, for perfomance reason (in theory). - if until-from < largestSPP { + // 1s:1d,1m:30d,1h:1y,1d:10y + // 86400,43200,8760,3650 + // + // 7200 -> 2h + // + // [0 - 7200) + // [7200 - 14400) + + // Why "- 1": always exclude the last data point to make sure it's not + // a pre-mature propagation. propagation aggregation is "mod down", check + // archiveInfo.AggregateInterval. + var until = lastArchive.Interval(firstEnd) - largestSPP*5 - 1 + + if until-from <= 0 { return nil } - dps, err := whisper.fetchCompressed(int64(from), int64(until), baseArchive) + dps, err := whisper.fetchCompressed(int64(from), int64(until), firstArchive) if err != nil { - return fmt.Errorf("mix: failed to baseArchive.fetchCompressed(%d, %d): %s", from, until, err) + return fmt.Errorf("mix: failed to firstArchive.fetchCompressed(%d, %d): %s", from, until, err) } + + adps := whisper.aggregateByArchives(dps) + for _, arc := range whisper.archives[1:] { + if dps := adps[arc]; len(dps) > 0 { + if _, err := arc.appendToBlockAndRotate(dps); err != nil { + return fmt.Errorf("mix: failed to propagate archive %s: %s", arc.Retention, err) + } + } + } + + return nil +} + +// NOTE: this method could be called from both read and write paths. +func (whisper *Whisper) aggregateByArchives(dps []dataPoint) (adps map[*archiveInfo][]dataPoint) { + adps = map[*archiveInfo][]dataPoint{} + if len(dps) == 0 { - return nil // TODO: should be an error? + return // TODO: should be an error? + } + + var spps []int + for _, arc := range whisper.archives[1:] { + var knownSPP bool + for _, spp := range spps { + knownSPP = knownSPP || (arc.secondsPerPoint == spp) + } + if !knownSPP { + spps = append(spps, arc.secondsPerPoint) + } } + sort.SliceStable(dps, func(i, j int) bool { return dps[i].interval < dps[j].interval }) + type groupedDataPoint struct { interval int values []float64 } - var dpsBySPP = map[int][]*groupedDataPoint{} - for _, dp := range dps { + var dpsBySPP = map[int][]groupedDataPoint{} + + for i, dp := range dps { + if i < len(dps)-1 && dps[i+1].interval == dp.interval { + continue + } + for _, spp := range spps { interval := dp.interval - mod(dp.interval, spp) // same as archiveInfo.AggregateInterval + if len(dpsBySPP[spp]) == 0 { - dpsBySPP[spp] = append(dpsBySPP[spp], &groupedDataPoint{ + gdp := groupedDataPoint{ interval: interval, values: []float64{dp.value}, - }) + } + + dpsBySPP[spp] = append(dpsBySPP[spp], gdp) continue } - if gdp := dpsBySPP[spp][len(dpsBySPP[spp])-1]; gdp.interval == interval { + gdp := &dpsBySPP[spp][len(dpsBySPP[spp])-1] + if gdp.interval == interval { gdp.values = append(gdp.values, dp.value) continue } - gdp := &groupedDataPoint{ - interval: interval, - values: []float64{dp.value}, - } - dpsBySPP[spp] = append(dpsBySPP[spp], gdp) - // check we have enough data points to propagate a value + baseArchive := whisper.archives[0] knownPercent := float32(len(gdp.values)) / float32(spp/baseArchive.secondsPerPoint) if knownPercent < whisper.xFilesFactor { - dpsBySPP[spp] = dpsBySPP[spp][:len(dpsBySPP[spp])-1] + // clean up the last data point + gdp.interval = interval + gdp.values = []float64{dp.value} continue } - // sorted for percentiles - sort.Float64s(gdp.values) - } - } + gdp = &groupedDataPoint{ + interval: interval, + values: []float64{dp.value}, + } - if len(dpsBySPP[largestSPP]) == 0 { - return nil + dpsBySPP[spp] = append(dpsBySPP[spp], *gdp) + continue + } } - var skipInterval int - var maxBufferSPP = 3 // TODO: come out with a better value? - // Handle cases of retentions ratio smaller than 3 between base and the - // last archives. - if ratio := whisper.archives[0].MaxRetention() / largestSPP; ratio < maxBufferSPP { - maxBufferSPP = 0 - } - // Make sure that we don't propagate prematurely by checking there are - // enough data points for the last archives. - for i := len(dpsBySPP[largestSPP]) - 1; i >= 0 && i >= len(dpsBySPP[largestSPP])-maxBufferSPP; i-- { - if len(dpsBySPP[largestSPP][i].values) < largestSPP/whisper.archives[0].secondsPerPoint { - skipInterval = dpsBySPP[largestSPP][i].interval - } + { + // TODO: think about it + // if len(dpsBySPP[largestSPP]) == 0 { + // return nil + // } } for _, arc := range whisper.archives[1:] { gdps := dpsBySPP[arc.secondsPerPoint] - dps := make([]dataPoint, len(gdps)) - for i, gdp := range gdps { - if skipInterval > 0 && gdp.interval >= skipInterval { - dps = dps[:i] - break + dps := make([]dataPoint, 0, len(gdps)) + _, limit := arc.getRange() // NOTE: not supporting propagation rewrite/out of order + for _, gdp := range gdps { + if gdp.interval <= limit { + continue } - dps[i].interval = gdp.interval - - if arc.aggregationSpec.Method == Percentile { - dps[i].value = aggregatePercentile(arc.aggregationSpec.Percentile, gdp.values) + dps = append(dps, dataPoint{}) + dp := &dps[len(dps)-1] + dp.interval = gdp.interval + + if arc.aggregationSpec == nil { + values := gdp.values + dp.value = aggregate(whisper.aggregationMethod, values) + } else if arc.aggregationSpec.Method == Percentile { + // sorted for percentiles + sort.Float64s(gdp.values) + dp.value = aggregatePercentile(arc.aggregationSpec.Percentile, gdp.values) } else { - dps[i].value = aggregate(arc.aggregationSpec.Method, gdp.values) + dp.value = aggregate(arc.aggregationSpec.Method, gdp.values) } } if len(dps) == 0 { continue } - if _, err := arc.appendToBlockAndRotate(dps); err != nil { - return fmt.Errorf("mix: failed to propagate archive %s: %s", arc.Retention, err) - } + adps[arc] = dps } - return nil + return } // Same implementation copied from carbonapi, without using quickselect for diff --git a/compress_test.go b/compress_test.go index 00550e2..458f254 100644 --- a/compress_test.go +++ b/compress_test.go @@ -8,7 +8,6 @@ import ( "math" "math/rand" "os" - "os/exec" "reflect" "testing" "time" @@ -18,6 +17,12 @@ import ( "github.com/kr/pretty" ) +func init() { + if err := os.MkdirAll("tmp", 0755); err != nil { + panic(err) + } +} + func TestBitsReadWrite(t *testing.T) { buf := make([]byte, 256) @@ -151,7 +156,7 @@ func TestBlockReadWrite2(t *testing.T) { } func TestCompressedWhisperReadWrite1(t *testing.T) { - fpath := "comp.whisper" + fpath := "tmp/comp1.whisper" os.Remove(fpath) whisper, err := CreateWithOptions( fpath, @@ -193,39 +198,45 @@ func TestCompressedWhisperReadWrite1(t *testing.T) { } // this negative data points should be ignored - if err := whisper.UpdateMany([]*TimeSeriesPoint{{Time: next(0) - 10, Value: 12}}); err != nil { + outOfOrderDataPoint := TimeSeriesPoint{Time: next(0) - 10, Value: 12} + if err := whisper.UpdateMany([]*TimeSeriesPoint{&outOfOrderDataPoint}); err != nil { t.Error(err) } - if got, want := whisper.archives[0].stats.discard.oldInterval, uint32(1); got != want { - t.Errorf("whisper.archives[0].stats.discard.oldInterval = %d; want %d", got, want) - } + // if got, want := whisper.archives[0].stats.discard.oldInterval, uint32(1); got != want { + // t.Errorf("whisper.archives[0].stats.discard.oldInterval = %d; want %d", got, want) + // } whisper.Close() - whisper, err = OpenWithOptions(fpath, &Options{Compressed: true, PointsPerBlock: 7200}) + whisper, err = OpenWithOptions(fpath, &Options{}) if err != nil { t.Fatal(err) } - expectVals := make([]float64, 60) - for i := 0; i < 60; i++ { - expectVals[i] = math.NaN() - } - for _, p := range input { - expectVals[p.Time-ts-1] = p.Value - } - expect := &TimeSeries{ - fromTime: ts + 1, - untilTime: ts + 61, - step: 1, - values: expectVals, - } - if ts, err := whisper.Fetch(ts, ts+300); err != nil { - t.Error(err) - } else if diff := cmp.Diff(ts, expect, cmp.AllowUnexported(TimeSeries{}), cmpopts.EquateNaNs()); diff != "" { - t.Error(diff) - } + // t.Run("out_of_order_write", func(t *testing.T) { + // expectVals := make([]float64, 60) + // for i := 0; i < 60; i++ { + // expectVals[i] = math.NaN() + // } + // for _, p := range input { + // expectVals[p.Time-ts-1] = p.Value + // } + // expectVals[outOfOrderDataPoint.Time-ts-1] = outOfOrderDataPoint.Value + // expect := &TimeSeries{ + // fromTime: ts + 1, + // untilTime: ts + 61, + // step: 1, + // values: expectVals, + // } + // if ts, err := whisper.Fetch(ts, ts+300); err != nil { + // t.Error(err) + // } else if diff := cmp.Diff(ts, expect, cmp.AllowUnexported(TimeSeries{}), cmpopts.EquateNaNs()); diff != "" { + // t.Error(diff) + // } + // }) + // this test case is no longer valid for cwhisper version 2, buffer + // design is deprecated. t.Run("buffer_overflow", func(t *testing.T) { // fmt.Println("---") // whisper.archives[0].dumpDataPointsCompressed() @@ -287,7 +298,7 @@ func TestCompressedWhisperReadWrite1(t *testing.T) { } func TestCompressedWhisperReadWrite2(t *testing.T) { - fpath := "comp.whisper" + fpath := "tmp/comp2.whisper" os.Remove(fpath) whisper, err := CreateWithOptions( fpath, @@ -380,6 +391,7 @@ func TestCompressedWhisperReadWrite2(t *testing.T) { } var fullTest3 = flag.Bool("full-test3", false, "run a full test of TestCompressedWhisperReadWrite3") +var cacheTest3Data = flag.Bool("debug-test3", false, "save a data of TestCompressedWhisperReadWrite3 for debugging") // To run a full test of TestCompressedWhisperReadWrite3, it would take about 10 // minutes, the slowness comes from standard whisper file propagation (around 10 @@ -387,15 +399,19 @@ var fullTest3 = flag.Bool("full-test3", false, "run a full test of TestCompresse // // Parallel is disabled because we need to manipulate Now in order to simulate // updates. +// +// TODO: cache data to make failed tests repeatable and easier to debug func TestCompressedWhisperReadWrite3(t *testing.T) { // TODO: add a test case of mixing random and sequential values/times inputs := []struct { name string randLimit func() int + fullTest func() bool gen func(prevTime time.Time, index int) *TimeSeriesPoint }{ { - name: "random_time", + name: "random_time", + fullTest: func() bool { return true }, gen: func(prevTime time.Time, index int) *TimeSeriesPoint { return &TimeSeriesPoint{ Value: 0, @@ -404,7 +420,8 @@ func TestCompressedWhisperReadWrite3(t *testing.T) { }, }, { - name: "random_time_value", + name: "random_time_value", + fullTest: func() bool { return true }, gen: func(prevTime time.Time, index int) *TimeSeriesPoint { return &TimeSeriesPoint{ Value: rand.NormFloat64(), @@ -413,7 +430,9 @@ func TestCompressedWhisperReadWrite3(t *testing.T) { }, }, { - name: "less_random_time_value", + name: "less_random_time_value", + fullTest: func() bool { return true }, + // randLimit: func() int { return 300 }, gen: func(prevTime time.Time, index int) *TimeSeriesPoint { return &TimeSeriesPoint{ Value: 2000.0 + float64(rand.Intn(1000)), @@ -421,9 +440,19 @@ func TestCompressedWhisperReadWrite3(t *testing.T) { } }, }, + { + name: "fast_simple", + fullTest: func() bool { return true }, + randLimit: func() int { return 300 }, + gen: func(prevTime time.Time, index int) *TimeSeriesPoint { + return &TimeSeriesPoint{Value: 2000.0 + float64(rand.Intn(1000)), Time: int(prevTime.Add(time.Second * 60).Unix())} + }, + }, + // these are slow tests, turned off by default { - name: "random_value", + name: "random_value", + fullTest: func() bool { return *fullTest3 }, gen: func(prevTime time.Time, index int) *TimeSeriesPoint { return &TimeSeriesPoint{ Value: rand.NormFloat64(), @@ -433,6 +462,7 @@ func TestCompressedWhisperReadWrite3(t *testing.T) { }, { name: "random_value2", + fullTest: func() bool { return *fullTest3 }, randLimit: func() int { return rand.Intn(300) + (60 * 60 * 24) }, gen: func(prevTime time.Time, index int) *TimeSeriesPoint { return &TimeSeriesPoint{ @@ -442,7 +472,8 @@ func TestCompressedWhisperReadWrite3(t *testing.T) { }, }, { - name: "simple", + name: "simple", + fullTest: func() bool { return *fullTest3 }, gen: func(prevTime time.Time, index int) *TimeSeriesPoint { return &TimeSeriesPoint{Value: 0, Time: int(prevTime.Add(time.Second).Unix())} }, @@ -466,6 +497,15 @@ func TestCompressedWhisperReadWrite3(t *testing.T) { os.Remove(fpath) os.Remove(fpath + ".cwsp") + var dataDebugFile *os.File + if *cacheTest3Data { + var err error + dataDebugFile, err = os.Create(fmt.Sprintf("tmp/test3_%s.data", input.name)) + if err != nil { + t.Fatal(err) + } + } + cwhisper, err := CreateWithOptions( fpath+".cwsp", []*Retention{ @@ -497,7 +537,8 @@ func TestCompressedWhisperReadWrite3(t *testing.T) { cwhisper.Close() ncwhisper.Close() - var now = time.Now() + // var now = time.Now() + var now = time.Unix(1589720099, 0) var total = 60*60*24*365*2 + 37 var start = now.Add(time.Second * time.Duration(total) * -1) Now = func() time.Time { return start } @@ -543,7 +584,17 @@ func TestCompressedWhisperReadWrite3(t *testing.T) { extended++ } - if *fullTest3 { + if input.fullTest() { + if *cacheTest3Data { + // if _, err := fmt.Fprintf(dataDebugFile, "%d\n", len(ps)); err != nil { + // t.Fatal(err) + // } + for _, p := range ps { + if _, err := fmt.Fprintf(dataDebugFile, "%d %d %d %v\n", p.Time, p.Time-mod(p.Time, 60), p.Time-mod(p.Time, 3600), p.Value); err != nil { + t.Fatal(err) + } + } + } ncwhisper, err = OpenWithOptions(fpath, &Options{InMemory: inMemory}) if err != nil { t.Fatal(err) @@ -563,6 +614,10 @@ func TestCompressedWhisperReadWrite3(t *testing.T) { } } + if *cacheTest3Data { + dataDebugFile.Close() + } + t.Logf("statTotalUpdates: %d extended: %d totalPoints: %d\n", statTotalUpdates, extended, totalPoints) // for _, a := range cwhisper.archives { // t.Logf("%s: %d\n", a.Retention, a.totalPoints()) @@ -587,9 +642,11 @@ func TestCompressedWhisperReadWrite3(t *testing.T) { // } // } - if *fullTest3 { + if input.fullTest() { t.Log("go", "run", "cmd/compare.go", "-v", "-now", fmt.Sprintf("%d", now.Unix()), fpath, fpath+".cwsp") - output, err := exec.Command("go", "run", "cmd/compare.go", "-now", fmt.Sprintf("%d", now.Unix()), fpath, fpath+".cwsp").CombinedOutput() + // output, err := exec.Command("go", "run", "cmd/compare.go", "-now", fmt.Sprintf("%d", now.Unix()), fpath, fpath+".cwsp").CombinedOutput() + output, err := Compare(fpath, fpath+".cwsp", int(now.Unix()), false, "", false, false, 2) + if err != nil { t.Log(string(output)) t.Error(err) @@ -609,6 +666,49 @@ func TestCompressedWhisperReadWrite3(t *testing.T) { } } +func TestCompressedWhisperSingleRetentionOutOfOrderWrite(t *testing.T) { + fpath := fmt.Sprintf("tmp/test_single_retention_ooo.cwsp") + os.Remove(fpath) + + rets := []*Retention{ + {secondsPerPoint: 1, numberOfPoints: 7200}, + } + cwhisper, err := CreateWithOptions( + fpath, rets, Sum, 0, + &Options{ + Compressed: true, PointsPerBlock: 1200, + InMemory: false, IgnoreNowOnWrite: true, + }, + ) + if err != nil { + panic(err) + } + + now := int(time.Now().Unix()) - 3600 + cwhisper.UpdateMany([]*TimeSeriesPoint{ + {Value: 1, Time: now + 0}, + {Value: 1, Time: now + 1}, + {Value: 1, Time: now + 2}, + }) + cwhisper.UpdateMany([]*TimeSeriesPoint{ + {Value: 0, Time: now + 1}, + }) + + data, err := cwhisper.Fetch(now-1, now+2) + if err != nil { + t.Error(err) + } + if got, want := data.Points(), []TimeSeriesPoint{ + {Time: now + 0, Value: 1}, + {Time: now + 1, Value: 1}, + {Time: now + 2, Value: 1}, + }; !reflect.DeepEqual(got, want) { + t.Errorf("data.Points() = %v; want %v", got, want) + } + + cwhisper.Close() +} + func TestCompressTo(t *testing.T) { fpath := "compress_to.wsp" os.Remove(fpath) @@ -664,7 +764,9 @@ func TestCompressTo(t *testing.T) { t.Fatal(err) } - output, err := exec.Command("go", "run", "cmd/compare.go", fpath, fpath+".cwsp").CombinedOutput() + // output, err := exec.Command("go", "run", "cmd/compare.go", fpath, fpath+".cwsp").CombinedOutput() + t.Log("go", "run", "cmd/compare.go", "-v", fpath, fpath+".cwsp") + output, err := Compare(fpath, fpath+".cwsp", 0, false, "", false, false, 2) if err != nil { t.Fatalf("%s: %s", err, output) } @@ -915,7 +1017,7 @@ func TestSanitizeAvgCompressedPointSizeOnCreate(t *testing.T) { }, } for _, c := range cases { - fpath := "extend.whisper" + fpath := "tmp/extend.whisper" os.Remove(fpath) whisper, err := CreateWithOptions( fpath, @@ -981,8 +1083,8 @@ func TestEstimatePointSize(t *testing.T) { } func TestFillCompressedMix(t *testing.T) { - srcPath := "fill-mix.src.cwsp" - dstPath := "fill-mix.dst.cwsp" + srcPath := "tmp/fill-mix.src.cwsp" + dstPath := "tmp/fill-mix.dst.cwsp" os.Remove(srcPath) os.Remove(dstPath) @@ -1144,7 +1246,7 @@ func TestFillCompressedMix(t *testing.T) { } func TestFetchCompressedMix(t *testing.T) { - srcPath := "fetch-mix.cwsp" + srcPath := "tmp/fetch-mix.cwsp" os.Remove(srcPath) srcMix, err := CreateWithOptions( @@ -1316,7 +1418,7 @@ func TestFetchCompressedMix(t *testing.T) { } func BenchmarkWriteCompressed(b *testing.B) { - fpath := "benchmark_write.cwsp" + fpath := "tmp/benchmark_write.cwsp" os.Remove(fpath) cwhisper, err := CreateWithOptions( fpath, @@ -1366,7 +1468,7 @@ func BenchmarkWriteCompressed(b *testing.B) { } func BenchmarkReadCompressed(b *testing.B) { - fpath := "benchmark_write.cwsp" + fpath := "tmp/benchmark_write.cwsp" cwhisper, err := OpenWithOptions(fpath, &Options{}) if err != nil { b.Fatal(err) @@ -1384,7 +1486,7 @@ func BenchmarkReadCompressed(b *testing.B) { } func BenchmarkReadStandard(b *testing.B) { - fpath := "benchmark_write.wsp" + fpath := "tmp/benchmark_write.wsp" cwhisper, err := OpenWithOptions(fpath, &Options{}) if err != nil { b.Fatal(err) @@ -1402,7 +1504,7 @@ func BenchmarkReadStandard(b *testing.B) { } func BenchmarkWriteStandard(b *testing.B) { - fpath := "benchmark_write.wsp" + fpath := "tmp/benchmark_write.wsp" os.Remove(fpath) cwhisper, err := CreateWithOptions( fpath, diff --git a/debug.go b/debug.go index 73523c5..52918a3 100644 --- a/debug.go +++ b/debug.go @@ -4,6 +4,9 @@ import ( "errors" "fmt" "math" + "os" + "strings" + "sync" "time" ) @@ -203,7 +206,7 @@ func (arc *archiveInfo) dumpDataPointsCompressed() { for i, p := range dps { // continue - fmt.Printf(" % 4d %d %s: %f\n", i, p.interval, toTime(p.interval), p.value) + fmt.Printf(" %s % 4d %d %s: %v\n", arc.String(), i, p.interval, toTime(p.interval), p.value) } } } @@ -233,7 +236,7 @@ func (whisper *Whisper) dumpDataPointsStandard(archive *archiveInfo) { points := unpackDataPoints(b) for i, p := range points { - fmt.Printf("%d: %d,% 10v\n", i, p.interval, p.value) + fmt.Printf("%s %d: %d,% 10v\n", archive.String(), i, p.interval, p.value) } } @@ -254,3 +257,174 @@ func GenTestArchive(buf []byte, ret Retention) *archiveInfo { } func GenDataPointSlice() []dataPoint { return []dataPoint{} } + +func Compare( + file1 string, + file2 string, + now int, + ignoreBuffer bool, + quarantinesRaw string, + verbose bool, + strict bool, + muteThreshold int, +) (msg string, err error) { + oflag := os.O_RDONLY + db1, err := OpenWithOptions(file1, &Options{OpenFileFlag: &oflag}) + if err != nil { + return "", err + } + db2, err := OpenWithOptions(file2, &Options{OpenFileFlag: &oflag}) + if err != nil { + return "", err + } + var quarantines [][2]int + if quarantinesRaw != "" { + for _, q := range strings.Split(quarantinesRaw, ";") { + var quarantine [2]int + for i, t := range strings.Split(q, ",") { + tim, err := time.Parse("2006-01-02", t) + if err != nil { + return "", err + } + quarantine[i] = int(tim.Unix()) + } + quarantines = append(quarantines, quarantine) + } + } + + oldNow := Now + Now = func() time.Time { + if now > 0 { + return time.Unix(int64(now), 0) + } + return time.Now() + } + defer func() { Now = oldNow }() + + var bad bool + for index, ret := range db1.Retentions() { + from := int(Now().Unix()) - ret.MaxRetention() + ret.SecondsPerPoint()*60 + until := int(Now().Unix()) + + msg += fmt.Sprintf("%d %s: from = %+v until = %+v (%s - %s)\n", index, ret, from, until, time.Unix(int64(from), 0).Format("2006-01-02 15:04:06"), time.Unix(int64(until), 0).Format("2006-01-02 15:04:06")) + + var dps1, dps2 *TimeSeries + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + var err error + dps1, err = db1.Fetch(from, until) + if err != nil { + panic(err) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + var err error + dps2, err = db2.Fetch(from, until) + if err != nil { + panic(err) + } + }() + + wg.Wait() + + if ignoreBuffer { + { + vals := dps1.Values() + vals[len(vals)-1] = math.NaN() + vals[len(vals)-2] = math.NaN() + } + { + vals := dps2.Values() + vals[len(vals)-1] = math.NaN() + vals[len(vals)-2] = math.NaN() + } + } + + for _, quarantine := range quarantines { + qfrom := quarantine[0] + quntil := quarantine[1] + if from <= qfrom && qfrom <= until { + qfromIndex := (qfrom - from) / ret.SecondsPerPoint() + quntilIndex := (quntil - from) / ret.SecondsPerPoint() + { + vals := dps1.Values() + for i := qfromIndex; i <= quntilIndex && i < len(vals); i++ { + vals[i] = math.NaN() + } + } + { + vals := dps2.Values() + for i := qfromIndex; i <= quntilIndex && i < len(vals); i++ { + vals[i] = math.NaN() + } + } + } + } + + var vals1, vals2 int + for _, p := range dps1.Values() { + if !math.IsNaN(p) { + vals1++ + } + } + for _, p := range dps2.Values() { + if !math.IsNaN(p) { + vals2++ + } + } + + msg += fmt.Sprintf(" len1 = %d len2 = %d vals1 = %d vals2 = %d\n", len(dps1.Values()), len(dps2.Values()), vals1, vals2) + + if len(dps1.Values()) != len(dps2.Values()) { + bad = true + msg += fmt.Sprintf(" size doesn't match: %d != %d\n", len(dps1.Values()), len(dps2.Values())) + } + if vals1 != vals2 { + bad = true + msg += fmt.Sprintf(" values doesn't match: %d != %d (%d)\n", vals1, vals2, vals1-vals2) + } + var ptDiff int + for i, p1 := range dps1.Values() { + if len(dps2.Values()) < i { + break + } + p2 := dps2.Values()[i] + if !((math.IsNaN(p1) && math.IsNaN(p2)) || p1 == p2) { + bad = true + ptDiff++ + if verbose { + msg += fmt.Sprintf(" %d: %d %v != %v\n", i, dps1.FromTime()+i*ret.SecondsPerPoint(), p1, p2) + } + } + } + msg += fmt.Sprintf(" point mismatches: %d\n", ptDiff) + if ptDiff <= muteThreshold && !strict { + bad = false + } + } + if db1.IsCompressed() { + if err := db1.CheckIntegrity(); err != nil { + msg += fmt.Sprintf("integrity: %s\n%s", file1, err) + bad = true + } + } + if db2.IsCompressed() { + if err := db2.CheckIntegrity(); err != nil { + msg += fmt.Sprintf("integrity: %s\n%s", file2, err) + bad = true + } + } + + if bad { + err = errors.New("whispers not equal") + } + + return msg, err +} diff --git a/whisper.go b/whisper.go index 4ecf468..f2d9594 100644 --- a/whisper.go +++ b/whisper.go @@ -41,6 +41,7 @@ const ( // Note: 4 bytes long in Whisper Header, 1 byte long in Archive Header type AggregationMethod int +const Unknown AggregationMethod = -1 const ( Average AggregationMethod = iota + 1 Sum @@ -75,13 +76,35 @@ func (am AggregationMethod) String() string { return fmt.Sprintf("%d", am) } -// func ParseAggregationMethods() {} +func ParseAggregationMethod(am string) AggregationMethod { + switch am { + case "average", "avg": + return Average + case "sum": + return Sum + case "first": + return First + case "last": + return Last + case "max": + return Max + case "min": + return Min + case "mix": + return Mix + case "percentile": + return Percentile + } + return Unknown +} type Options struct { Sparse bool FLock bool - Compressed bool + Compressed bool + // It's a hint, used if the retention is big enough, more in + // Retention.calculateSuitablePointsPerBlock PointsPerBlock int PointSize float32 InMemory bool @@ -89,6 +112,10 @@ type Options struct { MixAggregationSpecs []MixAggregationSpec MixAvgCompressedPointSizes map[int][]float32 + + SIMV bool // single interval multiple values + + IgnoreNowOnWrite bool } type MixAggregationSpec struct { @@ -96,6 +123,7 @@ type MixAggregationSpec struct { Percentile float32 } +// a simple file interface, mainly used for testing and migration. type file interface { Seek(offset int64, whence int) (ret int64, err error) Fd() uintptr @@ -125,8 +153,7 @@ type Whisper struct { compVersion uint8 pointsPerBlock int avgCompressedPointSize float32 - - crc32 uint32 + crc32 uint32 opts *Options Extended bool @@ -160,6 +187,8 @@ type archiveInfo struct { next *archiveInfo whisper *Whisper + // NOTE: buffer design deprecated for v2 and mix + // // why having buffer: // // original reasons: @@ -530,6 +559,8 @@ func validateRetentions(retentions Retentions) error { return fmt.Errorf("Each archive must have at least enough points to consolidate to the next archive (archive%v consolidates %v of archive%v's points but it has only %v total points)", i+1, nextRetention.secondsPerPoint/retention.secondsPerPoint, i, retention.numberOfPoints) } } + + // TODO: cwhisper has more strict retention limit, everything is aggregated from the first archive/retention return nil } @@ -649,7 +680,7 @@ func (whisper *Whisper) initMetaInfo() { prevArc := whisper.archives[i-1] prevArc.next = arc - if whisper.aggregationMethod != Mix { + if whisper.aggregationMethod != Mix && whisper.compVersion == 1 { prevArc.bufferSize = arc.secondsPerPoint / prevArc.secondsPerPoint * PointSize * bufferCount } } @@ -673,7 +704,8 @@ func (whisper *Whisper) writeHeader() (err error) { } func (whisper *Whisper) crc32Offset() int { - return len(compressedMagicString) + VersionSize + CompressedMetadataSize - 4 - FreeCompressedMetadataSize + const crc32Size = IntSize + return len(compressedMagicString) + VersionSize + CompressedMetadataSize - crc32Size - FreeCompressedMetadataSize } /* @@ -727,7 +759,7 @@ func (whisper *Whisper) bufferSize() int { } var bufSize int for i, arc := range whisper.archives[1:] { - bufSize += arc.secondsPerPoint / whisper.archives[i].secondsPerPoint * PointSize * 2 + bufSize += arc.secondsPerPoint / whisper.archives[i].secondsPerPoint * PointSize * bufferCount } return bufSize } @@ -838,6 +870,10 @@ func (whisper *Whisper) UpdateMany(points []*TimeSeriesPoint) (err error) { return whisper.UpdateManyForArchive(points, -1) } +// Note: for compressed format, extensions is triggered after update is +// done, so updates of the same data set being done in one +// UpdateManyForArchive call would have different result in file than in +// many UpdateManyForArchive calls. func (whisper *Whisper) UpdateManyForArchive(points []*TimeSeriesPoint, targetRetention int) (err error) { // recover panics and return as error defer func() { @@ -859,11 +895,17 @@ func (whisper *Whisper) UpdateManyForArchive(points []*TimeSeriesPoint, targetRe continue } - currentPoints, points = extractPoints(points, now, archive.MaxRetention()) + if whisper.opts.IgnoreNowOnWrite { + currentPoints = points + points = []*TimeSeriesPoint{} + } else { + currentPoints, points = extractPoints(points, now, archive.MaxRetention()) + } if len(currentPoints) == 0 { continue } + // reverse currentPoints reversePoints(currentPoints) if whisper.compressed { @@ -874,6 +916,9 @@ func (whisper *Whisper) UpdateManyForArchive(points []*TimeSeriesPoint, targetRe break } + // TODO: add a new options to update data points in smaller chunks if + // it exceeeds certain size, so extension could be triggered + // properly: ChunkUpdateSize err = whisper.archiveUpdateManyCompressed(archive, currentPoints) } else { err = whisper.archiveUpdateMany(archive, currentPoints) @@ -1065,6 +1110,7 @@ func (whisper *Whisper) propagate(timestamp int, higher, lower *archiveInfo) (bo if len(knownValues) == 0 { return false, nil } + knownPercent := float32(len(knownValues)) / float32(len(series)) if knownPercent < whisper.xFilesFactor { // check we have enough data points to propagate a value return false, nil @@ -1336,6 +1382,7 @@ func (r *Retention) SecondsPerPoint() int { return r.secondsPe func (r *Retention) NumberOfPoints() int { return r.numberOfPoints } func (r *Retention) SetAvgCompressedPointSize(size float32) { r.avgCompressedPointSize = size } +// NOTE: the calculation result is not saved on disk func (r *Retention) calculateSuitablePointsPerBlock(defaultSize int) int { if defaultSize == 0 { defaultSize = DefaultPointsPerBlock @@ -1528,7 +1575,7 @@ func aggregate(method AggregationMethod, knownValues []float64) float64 { } return min } - panic("Invalid aggregation method") + panic(fmt.Sprintf("Invalid aggregation method: %d", method)) } func packInt(b []byte, v, i int) int {