Skip to content

Commit

Permalink
Ensure consistent return order when querying a servergroup
Browse files Browse the repository at this point in the history
Without this whichever target responded first was the "base". This works
for most things, but when there are rates etc. it can sometimes differ.
A noticeable case was doing a query like sum(subset) / sum(totalset) and
you would sometimes get values >1 -- this was due to the fact that the
numbers in the query weren't coming from the same server in the
servergroup. This patch makes a best-effort attempt to do so. There is
still a possibility for an issue if the servergroup target list changes
mid-query, but otherwise its resolved
  • Loading branch information
jacksontj committed Jul 6, 2018
1 parent 42e9dd1 commit 109f8ad
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 91 deletions.
102 changes: 53 additions & 49 deletions servergroup/servergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,30 +205,30 @@ func (s *ServerGroup) RemoteRead(ctx context.Context, start, end time.Time, matc

childContext, childContextCancel := context.WithCancel(ctx)
defer childContextCancel()
resultChan := make(chan model.Value, len(targets))
errChan := make(chan error, len(targets))
resultChans := make([]chan interface{}, len(targets))

for _, target := range targets {
for i, target := range targets {
resultChans[i] = make(chan interface{}, 1)
parsedUrl, err := url.Parse(target + "/api/v1/read")
if err != nil {
return nil, err
}
go func(stringUrl *url.URL) {
go func(retChan chan interface{}, stringUrl *url.URL) {
cfg := &remote.ClientConfig{
URL: &config_util.URL{parsedUrl},
// TODO: from context?
Timeout: model.Duration(time.Minute * 2),
}
client, err := remote.NewClient(1, cfg)
if err != nil {
errChan <- err
retChan <- err
return
}

start := time.Now()
query, err := remote.ToQuery(int64(timestamp.FromTime(start)), int64(timestamp.FromTime(end)), matchers)
if err != nil {
errChan <- err
retChan <- err
return
}
// http://localhost:8083/api/v1/query?query=%7B__name__%3D%22metric%22%7D%5B302s%5D&time=21
Expand All @@ -237,7 +237,7 @@ func (s *ServerGroup) RemoteRead(ctx context.Context, start, end time.Time, matc

if err != nil {
serverGroupSummary.WithLabelValues(parsedUrl.Host, "remoteread", "error").Observe(float64(took))
errChan <- err
retChan <- err
} else {
serverGroupSummary.WithLabelValues(parsedUrl.Host, "remoteread", "success").Observe(float64(took))
// convert result (timeseries) to SampleStream
Expand All @@ -264,14 +264,14 @@ func (s *ServerGroup) RemoteRead(ctx context.Context, start, end time.Time, matc

err = promhttputil.ValueAddLabelSet(matrix, s.Cfg.Labels)
if err != nil {
errChan <- err
retChan <- err
return
}

resultChan <- matrix
retChan <- matrix
}

}(parsedUrl)
}(resultChans[i], parsedUrl)
}

// Wait for results as we get them
Expand All @@ -283,20 +283,22 @@ func (s *ServerGroup) RemoteRead(ctx context.Context, start, end time.Time, matc
case <-ctx.Done():
return nil, ctx.Err()

case err := <-errChan:
lastError = err
errCount++

case childResult := <-resultChan:
// If the server responded with a non-success, lets mark that as an error
// TODO: check qData.ResultType
if result == nil {
result = childResult
} else {
var err error
result, err = promhttputil.MergeValues(s.Cfg.GetAntiAffinity(), result, childResult)
if err != nil {
return nil, err
case ret := <-resultChans[i]:
switch retTyped := ret.(type) {
case error:
lastError = retTyped
errCount++
case model.Value:
// If the server responded with a non-success, lets mark that as an error
// TODO: check qData.ResultType
if result == nil {
result = retTyped
} else {
var err error
result, err = promhttputil.MergeValues(s.Cfg.GetAntiAffinity(), result, retTyped)
if err != nil {
return nil, err
}
}
}
}
Expand Down Expand Up @@ -337,27 +339,27 @@ func (s *ServerGroup) GetData(ctx context.Context, path string, inValues url.Val

childContext, childContextCancel := context.WithCancel(ctx)
defer childContextCancel()
resultChan := make(chan *promclient.DataResult, len(targets))
errChan := make(chan error, len(targets))
resultChans := make([]chan interface{}, len(targets))

for _, target := range targets {
for i, target := range targets {
resultChans[i] = make(chan interface{}, 1)
parsedUrl, err := url.Parse(target + path)
if err != nil {
return nil, err
}
parsedUrl.RawQuery = values.Encode()
go func(stringUrl string) {
go func(retChan chan interface{}, stringUrl string) {
start := time.Now()
result, err := promclient.GetData(childContext, stringUrl, s.Client, s.Cfg.Labels)
took := time.Now().Sub(start)
if err != nil {
serverGroupSummary.WithLabelValues(parsedUrl.Host, "getdata", "error").Observe(float64(took))
errChan <- err
retChan <- err
} else {
serverGroupSummary.WithLabelValues(parsedUrl.Host, "getdata", "success").Observe(float64(took))
resultChan <- result
retChan <- result
}
}(parsedUrl.String())
}(resultChans[i], parsedUrl.String())
}

// Wait for results as we get them
Expand All @@ -369,26 +371,28 @@ func (s *ServerGroup) GetData(ctx context.Context, path string, inValues url.Val
case <-ctx.Done():
return nil, ctx.Err()

case err := <-errChan:
lastError = err
errCount++

case childResult := <-resultChan:
// If the server responded with a non-success, lets mark that as an error
if childResult.Status != promhttputil.StatusSuccess {
lastError = fmt.Errorf(childResult.Error)
case ret := <-resultChans[i]:
switch retTyped := ret.(type) {
case error:
lastError = retTyped
errCount++
continue
}
case *promclient.DataResult:
// If the server responded with a non-success, lets mark that as an error
if retTyped.Status != promhttputil.StatusSuccess {
lastError = fmt.Errorf(retTyped.Error)
errCount++
continue
}

// TODO: check qData.ResultType
if result == nil {
result = childResult.Data.Result
} else {
var err error
result, err = promhttputil.MergeValues(s.Cfg.GetAntiAffinity(), result, childResult.Data.Result)
if err != nil {
return nil, err
// TODO: check qData.ResultType
if result == nil {
result = retTyped.Data.Result
} else {
var err error
result, err = promhttputil.MergeValues(s.Cfg.GetAntiAffinity(), result, retTyped.Data.Result)
if err != nil {
return nil, err
}
}
}
}
Expand Down
98 changes: 56 additions & 42 deletions servergroup/servergroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ type ServerGroups []*ServerGroup
func (s ServerGroups) GetValue(ctx context.Context, start, end time.Time, matchers []*labels.Matcher) (model.Value, error) {
childContext, childContextCancel := context.WithCancel(ctx)
defer childContextCancel()
resultChan := make(chan model.Value, len(s))
errChan := make(chan error, len(s))
resultChans := make([]chan interface{}, len(s))

// Scatter out all the queries
for _, serverGroup := range s {
go func(serverGroup *ServerGroup) {
for i, serverGroup := range s {
resultChans[i] = make(chan interface{}, 1)
go func(retChan chan interface{}, serverGroup *ServerGroup) {
result, err := serverGroup.GetValue(childContext, start, end, matchers)
if err != nil {
errChan <- err
retChan <- err
} else {
resultChan <- result
retChan <- result
}
}(serverGroup)
}(resultChans[i], serverGroup)
}

// Wait for results as we get them
Expand All @@ -41,16 +41,20 @@ func (s ServerGroups) GetValue(ctx context.Context, start, end time.Time, matche
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-errChan:
lastError = err
errCount++
case childResult := <-resultChan:
var err error
result, err = promhttputil.MergeValues(model.TimeFromUnix(0), result, childResult)
if err != nil {
return nil, err
case ret := <-resultChans[i]:
switch retTyped := ret.(type) {
case error:
lastError = retTyped
errCount++
case model.Value:
var err error
result, err = promhttputil.MergeValues(model.TimeFromUnix(0), result, retTyped)
if err != nil {
return nil, err
}
}
}

}

// If we got only errors, lets return that
Expand All @@ -64,19 +68,20 @@ func (s ServerGroups) GetValue(ctx context.Context, start, end time.Time, matche
func (s ServerGroups) GetData(ctx context.Context, path string, values url.Values) (model.Value, error) {
childContext, childContextCancel := context.WithCancel(ctx)
defer childContextCancel()
resultChan := make(chan model.Value, len(s))
errChan := make(chan error, len(s))

resultChans := make([]chan interface{}, len(s))

// Scatter out all the queries
for _, serverGroup := range s {
go func(serverGroup *ServerGroup) {
for i, serverGroup := range s {
resultChans[i] = make(chan interface{}, 1)
go func(retChan chan interface{}, serverGroup *ServerGroup) {
result, err := serverGroup.GetData(childContext, path, values)
if err != nil {
errChan <- err
retChan <- err
} else {
resultChan <- result
retChan <- result
}
}(serverGroup)
}(resultChans[i], serverGroup)
}

// Wait for results as we get them
Expand All @@ -87,14 +92,17 @@ func (s ServerGroups) GetData(ctx context.Context, path string, values url.Value
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-errChan:
lastError = err
errCount++
case childResult := <-resultChan:
var err error
result, err = promhttputil.MergeValues(model.TimeFromUnix(0), result, childResult)
if err != nil {
return nil, err
case ret := <-resultChans[i]:
switch retTyped := ret.(type) {
case error:
lastError = retTyped
errCount++
case model.Value:
var err error
result, err = promhttputil.MergeValues(model.TimeFromUnix(0), result, retTyped)
if err != nil {
return nil, err
}
}
}
}
Expand All @@ -110,19 +118,22 @@ func (s ServerGroups) GetData(ctx context.Context, path string, values url.Value
func (s ServerGroups) GetValuesForLabelName(ctx context.Context, path string) (*promclient.LabelResult, error) {
childContext, childContextCancel := context.WithCancel(ctx)
defer childContextCancel()
resultChans := make([]chan interface{}, len(s))

resultChan := make(chan *promclient.LabelResult, len(s))
errChan := make(chan error, len(s))

// Scatter out all the queries
for _, serverGroup := range s {
go func(serverGroup *ServerGroup) {
for i, serverGroup := range s {
resultChans[i] = make(chan interface{}, 1)
go func(retChan chan interface{}, serverGroup *ServerGroup) {
result, err := serverGroup.GetValuesForLabelName(childContext, path)
if err != nil {
errChan <- err
} else {
resultChan <- result
}
}(serverGroup)
}(resultChans[i], serverGroup)
}

// Wait for results as we get them
Expand All @@ -133,15 +144,18 @@ func (s ServerGroups) GetValuesForLabelName(ctx context.Context, path string) (*
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-errChan:
lastError = err
errCount++
case childResult := <-resultChan:
if result == nil {
result = childResult
} else {
if err := result.Merge(childResult); err != nil {
return nil, err
case ret := <-resultChans[i]:
switch retTyped := ret.(type) {
case error:
lastError = retTyped
errCount++
case *promclient.LabelResult:
if result == nil {
result = retTyped
} else {
if err := result.Merge(retTyped); err != nil {
return nil, err
}
}
}
}
Expand Down

0 comments on commit 109f8ad

Please sign in to comment.