Skip to content

Commit

Permalink
Cut 0.23.2-rc.1 (#4969)
Browse files Browse the repository at this point in the history
* downsample: fix deadlock if error occurs (#4962)

Fix deadlock that occurs if there is some backlog of blocks to be
deduplicated, and an error occurs. For this to trigger, there needs to
be at least `downsampleConcurrency + 2` blocks in the backlog.

Add test to cover this regression. Affected versions 0.22.0 - 0.23.1.

Signed-off-by: Giedrius Statkevičius <[email protected]>

* VERSION: bump

Signed-off-by: Giedrius Statkevičius <[email protected]>

* Makefile: update SHA256

Update from
https://github.com/thanos-io/thanos/blob/main/Makefile#L16-L17.

Signed-off-by: Giedrius Statkevičius <[email protected]>

* *: 0.23.2 -> 0.23.2-rc.1

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS authored Dec 20, 2021
1 parent 6320327 commit 0c6d6fb
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 62 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan

We use *breaking :warning:* to mark changes that are not backward compatible (relates only to v0.y.z releases.)

## [v0.23.2-rc.0](https://github.com/thanos-io/thanos/tree/release-0.23) - 2021.12.09
## [v0.23.2-rc.1](https://github.com/thanos-io/thanos/tree/release-0.23) - 2021.12.20

### Fixed

- [#4795](https://github.com/thanos-io/thanos/pull/4795) Query: Fix deadlock in endpointset.
- [#4962](https://github.com/thanos-io/thanos/pull/4962) Compact/downsample: fix deadlock if error occurs with some backlog of blocks; fixes [this pull request](https://github.com/thanos-io/thanos/pull/4430). Affected versions are 0.22.0 - 0.23.1.

## [v0.23.1](https://github.com/thanos-io/thanos/tree/release-0.23) - 2021.10.1

Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ arch = $(shell uname -m)
# Update at 2021.12.08
ifeq ($(arch), x86_64)
# amd64
BASE_DOCKER_SHA="97a9aacc097e5dbdec33b0d671adea0785e76d26ff2b979ee28570baf6a9155d"
BASE_DOCKER_SHA="768a51a5f71827471e6e58f0d6200c2fa24f2cb5cde1ecbd67fe28f93d4ef464"
else ifeq ($(arch), armv8)
# arm64
BASE_DOCKER_SHA="5feb736d32e5b57f4944691d00b581f1f9192b3732cab03e3b6034cf0d1c8f2c"
BASE_DOCKER_SHA="042d6195e1793b226d1632117cccb4c4906c8ab393b8b68328ad43cf59c64f9d"
else
echo >&2 "only support amd64 or arm64 arch" && exit 1
endif
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.23.2-rc.0
0.23.2-rc.1
131 changes: 73 additions & 58 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"sort"
"sync"
"time"

extflag "github.com/efficientgo/tools/extkingpin"
Expand All @@ -20,13 +21,13 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"golang.org/x/sync/errgroup"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/errutil"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
Expand Down Expand Up @@ -229,90 +230,104 @@ func downsampleBucket(
})

var (
eg errgroup.Group
ch = make(chan *metadata.Meta, downsampleConcurrency)
wg sync.WaitGroup
metaCh = make(chan *metadata.Meta)
downsampleErrs errutil.MultiError
errCh = make(chan error, downsampleConcurrency)
workerCtx, workerCancel = context.WithCancel(ctx)
)

defer workerCancel()

level.Debug(logger).Log("msg", "downsampling bucket", "concurrency", downsampleConcurrency)
for i := 0; i < downsampleConcurrency; i++ {
eg.Go(func() error {
for m := range ch {
wg.Add(1)
go func() {
defer wg.Done()
for m := range metaCh {
resolution := downsample.ResLevel1
errMsg := "downsampling to 5 min"
if m.Thanos.Downsample.Resolution == downsample.ResLevel1 {
resolution = downsample.ResLevel2
errMsg = "downsampling to 60 min"
}
if err := processDownsampling(ctx, logger, bkt, m, dir, resolution, hashFunc, metrics); err != nil {
if err := processDownsampling(workerCtx, logger, bkt, m, dir, resolution, hashFunc, metrics); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
return errors.Wrap(err, errMsg)
errCh <- errors.Wrap(err, errMsg)
}
metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
}
return nil
})
}()
}

// Workers scheduled, distribute blocks.
eg.Go(func() error {
defer close(ch)
for _, mk := range metasULIDS {
m := metas[mk]
metaSendLoop:
for _, mk := range metasULIDS {
m := metas[mk]

switch m.Thanos.Downsample.Resolution {
case downsample.ResLevel2:
continue
switch m.Thanos.Downsample.Resolution {
case downsample.ResLevel2:
continue

case downsample.ResLevel0:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources5m[id]; !ok {
missing = true
break
}
}
if !missing {
continue
}
// Only downsample blocks once we are sure to get roughly 2 chunks out of it.
// NOTE(fabxc): this must match with at which block size the compactor creates downsampled
// blocks. Otherwise we may never downsample some data.
if m.MaxTime-m.MinTime < downsample.DownsampleRange0 {
continue
case downsample.ResLevel0:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources5m[id]; !ok {
missing = true
break
}
}
if !missing {
continue
}
// Only downsample blocks once we are sure to get roughly 2 chunks out of it.
// NOTE(fabxc): this must match with at which block size the compactor creates downsampled
// blocks. Otherwise we may never downsample some data.
if m.MaxTime-m.MinTime < downsample.DownsampleRange0 {
continue
}

case downsample.ResLevel1:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources1h[id]; !ok {
missing = true
break
}
}
if !missing {
continue
}
// Only downsample blocks once we are sure to get roughly 2 chunks out of it.
// NOTE(fabxc): this must match with at which block size the compactor creates downsampled
// blocks. Otherwise we may never downsample some data.
if m.MaxTime-m.MinTime < downsample.DownsampleRange1 {
continue
case downsample.ResLevel1:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources1h[id]; !ok {
missing = true
break
}
}

select {
case <-ctx.Done():
return ctx.Err()
case ch <- m:
if !missing {
continue
}
// Only downsample blocks once we are sure to get roughly 2 chunks out of it.
// NOTE(fabxc): this must match with at which block size the compactor creates downsampled
// blocks. Otherwise we may never downsample some data.
if m.MaxTime-m.MinTime < downsample.DownsampleRange1 {
continue
}
}
return nil
})

if err := eg.Wait(); err != nil {
return errors.Wrap(err, "downsample bucket")
select {
case <-workerCtx.Done():
downsampleErrs.Add(workerCtx.Err())
break metaSendLoop
case metaCh <- m:
case downsampleErr := <-errCh:
downsampleErrs.Add(downsampleErr)
break metaSendLoop
}
}
return nil

close(metaCh)
wg.Wait()
workerCancel()
close(errCh)

// Collect any other error reported by the workers.
for downsampleErr := range errCh {
downsampleErrs.Add(downsampleErr)
}

return downsampleErrs.Err()
}

func processDownsampling(
Expand Down
141 changes: 141 additions & 0 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ package main

import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"strings"
"testing"
"time"

Expand All @@ -25,6 +28,144 @@ import (
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
)

type erroringBucket struct {
bkt objstore.InstrumentedBucket
}

func (b *erroringBucket) Close() error {
return b.bkt.Close()
}

// WithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
// thanos_objstore_bucket_operation_failures_total metric.
func (b *erroringBucket) WithExpectedErrs(f objstore.IsOpFailureExpectedFunc) objstore.Bucket {
return b.bkt.WithExpectedErrs(f)
}

// ReaderWithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
// thanos_objstore_bucket_operation_failures_total metric.
func (b *erroringBucket) ReaderWithExpectedErrs(f objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
return b.bkt.ReaderWithExpectedErrs(f)
}

func (b *erroringBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
return b.bkt.Iter(ctx, dir, f, options...)
}

// Get returns a reader for the given object name.
func (b *erroringBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
if strings.Contains(name, "chunk") {
return nil, fmt.Errorf("some random error has occurred")
}
return b.bkt.Get(ctx, name)
}

// GetRange returns a new range reader for the given object name and range.
func (b *erroringBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
if strings.Contains(name, "chunk") {
return nil, fmt.Errorf("some random error has occurred")
}
return b.bkt.GetRange(ctx, name, off, length)
}

// Exists checks if the given object exists in the bucket.
func (b *erroringBucket) Exists(ctx context.Context, name string) (bool, error) {
return b.bkt.Exists(ctx, name)
}

// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
func (b *erroringBucket) IsObjNotFoundErr(err error) bool {
return b.bkt.IsObjNotFoundErr(err)
}

// Attributes returns information about the specified object.
func (b *erroringBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
return b.bkt.Attributes(ctx, name)
}

// Upload the contents of the reader as an object into the bucket.
// Upload should be idempotent.
func (b *erroringBucket) Upload(ctx context.Context, name string, r io.Reader) error {
return b.bkt.Upload(ctx, name, r)
}

// Delete removes the object with the given name.
// If object does not exists in the moment of deletion, Delete should throw error.
func (b *erroringBucket) Delete(ctx context.Context, name string) error {
return b.bkt.Delete(ctx, name)
}

// Name returns the bucket name for the provider.
func (b *erroringBucket) Name() string {
return b.bkt.Name()
}

// Ensures that downsampleBucket() stops its work properly
// after an error occurs with some blocks in the backlog.
// Testing for https://github.com/thanos-io/thanos/issues/4960.
func TestRegression4960_Deadlock(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
dir, err := ioutil.TempDir("", "test-compact-cleanup")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())
bkt = &erroringBucket{bkt: bkt}
var id, id2, id3 ulid.ULID
{
id, err = e2eutil.CreateBlock(
ctx,
dir,
[]labels.Labels{{{Name: "a", Value: "1"}}},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "1"}},
downsample.ResLevel0, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String()), metadata.NoneFunc))
}
{
id2, err = e2eutil.CreateBlock(
ctx,
dir,
[]labels.Labels{{{Name: "a", Value: "2"}}},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "2"}},
downsample.ResLevel0, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id2.String()), metadata.NoneFunc))
}
{
id3, err = e2eutil.CreateBlock(
ctx,
dir,
[]labels.Labels{{{Name: "a", Value: "2"}}},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "2"}},
downsample.ResLevel0, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id3.String()), metadata.NoneFunc))
}

meta, err := block.DownloadMeta(ctx, logger, bkt, id)
testutil.Ok(t, err)

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(meta.Thanos))))
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, "", nil, nil, nil)
testutil.Ok(t, err)

metas, _, err := metaFetcher.Fetch(ctx)
testutil.Ok(t, err)
err = downsampleBucket(ctx, logger, metrics, bkt, metas, dir, 1, metadata.NoneFunc)
testutil.NotOk(t, err)

testutil.Assert(t, strings.Contains(err.Error(), "some random error has occurred"))

}

func TestCleanupDownsampleCacheFolder(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
dir, err := ioutil.TempDir("", "test-compact-cleanup")
Expand Down

0 comments on commit 0c6d6fb

Please sign in to comment.