From 3ec3f1bc509b39d70984e1c25b0e6ec91f65da9e Mon Sep 17 00:00:00 2001 From: Pierre Fersing Date: Tue, 14 Jul 2020 20:32:54 +0200 Subject: [PATCH] Send metrics in FIFO order (#7814) Metrics are send from older to newer metrics, even when outputs is failing. In case of buffer full, we still drop the oldest metrics, but non-dropped metrics are send in the order they are received. --- CHANGELOG.md | 1 + models/buffer.go | 60 ++++++++++----------- models/buffer_test.go | 98 +++++++++++++++++------------------ models/running_output_test.go | 10 ++-- 4 files changed, 82 insertions(+), 87 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c79d585341311..5c584b9a092af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -105,6 +105,7 @@ - [#7558](https://github.com/influxdata/telegraf/issues/7558): Remove trailing backslash from tag keys/values in influx serializer. - [#7715](https://github.com/influxdata/telegraf/issues/7715): Fix incorrect Azure SQL DB server properties. - [#7431](https://github.com/influxdata/telegraf/issues/7431): Fix json unmarshal error in the kibana input. +- [#5633](https://github.com/influxdata/telegraf/issues/5633): Send metrics in FIFO order. ## v1.14.5 [2020-06-30] diff --git a/models/buffer.go b/models/buffer.go index 18e9987caec26..9cc1a3d889f38 100644 --- a/models/buffer.go +++ b/models/buffer.go @@ -111,7 +111,7 @@ func (b *Buffer) add(m telegraf.Metric) int { b.metricDropped(b.buf[b.last]) dropped++ - if b.last == b.batchFirst && b.batchSize > 0 { + if b.batchSize > 0 { b.batchSize-- b.batchFirst = b.next(b.batchFirst) } @@ -146,8 +146,8 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) int { return dropped } -// Batch returns a slice containing up to batchSize of the most recently added -// metrics. Metrics are ordered from newest to oldest in the batch. The +// Batch returns a slice containing up to batchSize of the oldest metrics not +// yet dropped. Metrics are ordered from oldest to newest in the batch. The // batch must not be modified by the client. func (b *Buffer) Batch(batchSize int) []telegraf.Metric { b.Lock() @@ -159,18 +159,17 @@ func (b *Buffer) Batch(batchSize int) []telegraf.Metric { return out } - b.batchFirst = b.cap + b.last - outLen - b.batchFirst %= b.cap + b.batchFirst = b.first b.batchSize = outLen batchIndex := b.batchFirst for i := range out { - out[len(out)-1-i] = b.buf[batchIndex] + out[i] = b.buf[batchIndex] b.buf[batchIndex] = nil batchIndex = b.next(batchIndex) } - b.last = b.batchFirst + b.first = b.nextby(b.first, b.batchSize) b.size -= outLen return out } @@ -198,38 +197,22 @@ func (b *Buffer) Reject(batch []telegraf.Metric) { return } - older := b.dist(b.first, b.batchFirst) free := b.cap - b.size - restore := min(len(batch), free+older) + restore := min(len(batch), free) + skip := len(batch) - restore - // Rotate newer metrics forward the number of metrics that we can restore. - rb := b.batchFirst - rp := b.last - re := b.nextby(rp, restore) - b.last = re + b.first = b.prevby(b.first, restore) + b.size = min(b.size+restore, b.cap) - for rb != rp && rp != re { - rp = b.prev(rp) - re = b.prev(re) + re := b.first - if b.buf[re] != nil { - b.metricDropped(b.buf[re]) - b.first = b.next(b.first) - } - - b.buf[re] = b.buf[rp] - b.buf[rp] = nil - } - - // Copy metrics from the batch back into the buffer; recall that the - // batch is in reverse order compared to b.buf + // Copy metrics from the batch back into the buffer for i := range batch { - if i < restore { - re = b.prev(re) - b.buf[re] = batch[i] - b.size = min(b.size+1, b.cap) - } else { + if i < skip { b.metricDropped(batch[i]) + } else { + b.buf[re] = batch[i] + re = b.next(re) } } @@ -273,6 +256,17 @@ func (b *Buffer) prev(index int) int { return index } +// prevby returns the index that is count older with wrapping. +func (b *Buffer) prevby(index, count int) int { + index -= count + for index < 0 { + index += b.cap + } + + index %= b.cap + return index +} + func (b *Buffer) resetBatch() { b.batchFirst = 0 b.batchSize = 0 diff --git a/models/buffer_test.go b/models/buffer_test.go index fa8fb1668e102..9aef94fb86585 100644 --- a/models/buffer_test.go +++ b/models/buffer_test.go @@ -161,7 +161,7 @@ func TestBuffer_BatchLatest(t *testing.T) { testutil.RequireMetricsEqual(t, []telegraf.Metric{ - MetricTime(3), + MetricTime(1), MetricTime(2), }, batch) } @@ -177,8 +177,8 @@ func TestBuffer_BatchLatestWrap(t *testing.T) { testutil.RequireMetricsEqual(t, []telegraf.Metric{ - MetricTime(5), - MetricTime(4), + MetricTime(2), + MetricTime(3), }, batch) } @@ -193,17 +193,17 @@ func TestBuffer_MultipleBatch(t *testing.T) { batch := b.Batch(5) testutil.RequireMetricsEqual(t, []telegraf.Metric{ - MetricTime(6), - MetricTime(5), - MetricTime(4), - MetricTime(3), + MetricTime(1), MetricTime(2), + MetricTime(3), + MetricTime(4), + MetricTime(5), }, batch) b.Accept(batch) batch = b.Batch(5) testutil.RequireMetricsEqual(t, []telegraf.Metric{ - MetricTime(1), + MetricTime(6), }, batch) b.Accept(batch) } @@ -223,11 +223,11 @@ func TestBuffer_RejectWithRoom(t *testing.T) { batch = b.Batch(5) testutil.RequireMetricsEqual(t, []telegraf.Metric{ - MetricTime(5), - MetricTime(4), - MetricTime(3), - MetricTime(2), MetricTime(1), + MetricTime(2), + MetricTime(3), + MetricTime(4), + MetricTime(5), }, batch) } @@ -246,11 +246,11 @@ func TestBuffer_RejectNothingNewFull(t *testing.T) { batch = b.Batch(5) testutil.RequireMetricsEqual(t, []telegraf.Metric{ - MetricTime(5), - MetricTime(4), - MetricTime(3), - MetricTime(2), MetricTime(1), + MetricTime(2), + MetricTime(3), + MetricTime(4), + MetricTime(5), }, batch) } @@ -275,11 +275,11 @@ func TestBuffer_RejectNoRoom(t *testing.T) { batch = b.Batch(5) testutil.RequireMetricsEqual(t, []telegraf.Metric{ - MetricTime(8), - MetricTime(7), - MetricTime(6), - MetricTime(5), MetricTime(4), + MetricTime(5), + MetricTime(6), + MetricTime(7), + MetricTime(8), }, batch) } @@ -299,11 +299,11 @@ func TestBuffer_RejectRoomExact(t *testing.T) { batch = b.Batch(5) testutil.RequireMetricsEqual(t, []telegraf.Metric{ - MetricTime(5), - MetricTime(4), - MetricTime(3), - MetricTime(2), MetricTime(1), + MetricTime(2), + MetricTime(3), + MetricTime(4), + MetricTime(5), }, batch) } @@ -324,11 +324,11 @@ func TestBuffer_RejectRoomOverwriteOld(t *testing.T) { batch = b.Batch(5) testutil.RequireMetricsEqual(t, []telegraf.Metric{ - MetricTime(6), - MetricTime(5), - MetricTime(4), - MetricTime(3), MetricTime(2), + MetricTime(3), + MetricTime(4), + MetricTime(5), + MetricTime(6), }, batch) } @@ -351,11 +351,11 @@ func TestBuffer_RejectPartialRoom(t *testing.T) { batch = b.Batch(5) testutil.RequireMetricsEqual(t, []telegraf.Metric{ - MetricTime(7), - MetricTime(6), - MetricTime(5), - MetricTime(4), MetricTime(3), + MetricTime(4), + MetricTime(5), + MetricTime(6), + MetricTime(7), }, batch) } @@ -394,11 +394,11 @@ func TestBuffer_RejectNewMetricsWrapped(t *testing.T) { batch = b.Batch(5) testutil.RequireMetricsEqual(t, []telegraf.Metric{ - MetricTime(15), - MetricTime(14), - MetricTime(13), - MetricTime(12), MetricTime(11), + MetricTime(12), + MetricTime(13), + MetricTime(14), + MetricTime(15), }, batch) } @@ -425,11 +425,11 @@ func TestBuffer_RejectWrapped(t *testing.T) { batch = b.Batch(5) testutil.RequireMetricsEqual(t, []telegraf.Metric{ - MetricTime(12), - MetricTime(11), - MetricTime(10), - MetricTime(9), MetricTime(8), + MetricTime(9), + MetricTime(10), + MetricTime(11), + MetricTime(12), }, batch) } @@ -467,16 +467,16 @@ func TestBuffer_RejectAdjustFirst(t *testing.T) { batch = b.Batch(10) testutil.RequireMetricsEqual(t, []telegraf.Metric{ - MetricTime(19), - MetricTime(18), - MetricTime(17), - MetricTime(16), - MetricTime(15), - MetricTime(14), - MetricTime(13), - MetricTime(12), - MetricTime(11), MetricTime(10), + MetricTime(11), + MetricTime(12), + MetricTime(13), + MetricTime(14), + MetricTime(15), + MetricTime(16), + MetricTime(17), + MetricTime(18), + MetricTime(19), }, batch) } diff --git a/models/running_output_test.go b/models/running_output_test.go index 89cd3beec4c97..38f79f9db397d 100644 --- a/models/running_output_test.go +++ b/models/running_output_test.go @@ -360,7 +360,7 @@ func TestRunningOutputWriteFailOrder(t *testing.T) { // Verify that 10 metrics were written assert.Len(t, m.Metrics(), 10) // Verify that they are in order - expected := append(reverse(next5), reverse(first5)...) + expected := append(first5, next5...) assert.Equal(t, expected, m.Metrics()) } @@ -421,9 +421,9 @@ func TestRunningOutputWriteFailOrder2(t *testing.T) { // Verify that 20 metrics were written assert.Len(t, m.Metrics(), 20) // Verify that they are in order - expected := append(reverse(next5), reverse(first5)...) - expected = append(expected, reverse(next5)...) - expected = append(expected, reverse(first5)...) + expected := append(first5, next5...) + expected = append(expected, first5...) + expected = append(expected, next5...) assert.Equal(t, expected, m.Metrics()) } @@ -464,7 +464,7 @@ func TestRunningOutputWriteFailOrder3(t *testing.T) { // Verify that 6 metrics were written assert.Len(t, m.Metrics(), 6) // Verify that they are in order - expected := []telegraf.Metric{next5[0], first5[4], first5[3], first5[2], first5[1], first5[0]} + expected := []telegraf.Metric{first5[0], first5[1], first5[2], first5[3], first5[4], next5[0]} assert.Equal(t, expected, m.Metrics()) }