Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
mattdurham committed Oct 15, 2024
1 parent b2d5cab commit 57d81b0
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 69 deletions.
2 changes: 1 addition & 1 deletion docs/sources/reference/compatibility/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ The following components, grouped by namespace, _export_ Prometheus `MetricsRece

{{< collapse title="prometheus" >}}
- [prometheus.relabel](../components/prometheus/prometheus.relabel)
- [prometheus.remote.queue](../components/prometheus/prometheus.remote.queue)
- [prometheus.write.queue](../components/prometheus/prometheus.write.queue)
- [prometheus.remote_write](../components/prometheus/prometheus.remote_write)
{{< /collapse >}}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
---
canonical: https://grafana.com/docs/alloy/latest/reference/components/prometheus/prometheus.remote.queue/
description: Learn about prometheus.remote.queue
title: prometheus.remote.queue
canonical: https://grafana.com/docs/alloy/latest/reference/components/prometheus/prometheus.write.queue/
description: Learn about prometheus.write.queue
title: prometheus.write.queue
---


<span class="badge docs-labels__stage docs-labels__item">Experimental</span>

# prometheus.remote.queue
# prometheus.write.queue

`prometheus.remote.queue` collects metrics sent from other components into a
`prometheus.write.queue` collects metrics sent from other components into a
Write-Ahead Log (WAL) and forwards them over the network to a series of
user-supplied endpoints. Metrics are sent over the network using the
[Prometheus Remote Write protocol][remote_write-spec].

You can specify multiple `prometheus.remote.queue` components by giving them different labels.
You can specify multiple `prometheus.write.queue` components by giving them different labels.

You should consider everything here extremely experimental and highly subject to change.
[emote_write-spec]: https://prometheus.io/docs/specs/remote_write_spec/
Expand All @@ -24,7 +24,7 @@ You should consider everything here extremely experimental and highly subject to
## Usage

```alloy
prometheus.remote.queue "LABEL" {
prometheus.write.queue "LABEL" {
endpoint "default "{
url = REMOTE_WRITE_URL
Expand All @@ -46,7 +46,7 @@ Name | Type | Description | Default | Required
## Blocks

The following blocks are supported inside the definition of
`prometheus.remote.queue`:
`prometheus.write.queue`:

Hierarchy | Block | Description | Required
--------- | ----- | ----------- | --------
Expand All @@ -72,7 +72,7 @@ The following arguments are supported:
Name | Type | Description | Default | Required
---- | ---- |-------------------------------------------------------------------------------|---------| --------
`max_signals_to_batch` | `uint` | The maximum number of signals before they are batched to disk. | `10000` | no
`batch_frequency` | `duration` | How often to batch signals to disk if `max_signals_to_batch` is not reached. | `5s` | no
`batch_interval` | `duration` | How often to batch signals to disk if `max_signals_to_batch` is not reached. | `5s` | no


### endpoint block
Expand All @@ -90,7 +90,7 @@ Name | Type | Description |
`retry_backoff` | `duration` | How often to wait between retries. | `1s` | no
`max_retry_attempts` | Maximum number of retries before dropping the batch. | `0` | no
`batch_count` | `uint` | How many series to queue in each queue. | `1000` | no
`flush_frequency` | `duration` | How often to wait until sending if `batch_count` is not trigger. | `1s` | no
`flush_interval` | `duration` | How often to wait until sending if `batch_count` is not trigger. | `1s` | no
`parallelism` | `uint` | How many parallel batches to write. | 10 | no
`external_labels` | `map(string)` | Labels to add to metrics sent over the network. | | no

Expand All @@ -109,13 +109,13 @@ Name | Type | Description

## Component health

`prometheus.remote.queue` is only reported as unhealthy if given an invalid
`prometheus.write.queue` is only reported as unhealthy if given an invalid
configuration. In those cases, exported fields are kept at their last healthy
values.

## Debug information

`prometheus.remote.queue` does not expose any component-specific debug
`prometheus.write.queue` does not expose any component-specific debug
information.

## Debug metrics
Expand Down Expand Up @@ -199,14 +199,14 @@ Metrics that are new to `prometheus.remote.write`. These are highly subject to c

## Examples

The following examples show you how to create `prometheus.remote.queue` components that send metrics to different destinations.
The following examples show you how to create `prometheus.write.queue` components that send metrics to different destinations.

### Send metrics to a local Mimir instance

You can create a `prometheus.remote.queue` component that sends your metrics to a local Mimir instance:
You can create a `prometheus.write.queue` component that sends your metrics to a local Mimir instance:

```alloy
prometheus.remote.queue "staging" {
prometheus.write.queue "staging" {
// Send metrics to a locally running Mimir.
endpoint "mimir" {
url = "http://mimir:9009/api/v1/push"
Expand All @@ -219,21 +219,21 @@ prometheus.remote.queue "staging" {
}
// Configure a prometheus.scrape component to send metrics to
// prometheus.remote.queue component.
// prometheus.write.queue component.
prometheus.scrape "demo" {
targets = [
// Collect metrics from the default HTTP listen address.
{"__address__" = "127.0.0.1:12345"},
]
forward_to = [prometheus.remote.queue.staging.receiver]
forward_to = [prometheus.write.queue.staging.receiver]
}
```

## Technical details

`prometheus.remote.queue` uses [snappy][] for compression.
`prometheus.remote.queue` sends native histograms by default.
`prometheus.write.queue` uses [snappy][] for compression.
`prometheus.write.queue` sends native histograms by default.
Any labels that start with `__` will be removed before sending to the endpoint.

### Data retention
Expand All @@ -243,17 +243,17 @@ Any data that has not been written to disk, or that is in the network queues is

### Retries

`prometheus.remote.queue` will retry sending data if the following errors or HTTP status codes are returned:
`prometheus.write.queue` will retry sending data if the following errors or HTTP status codes are returned:

* Network errors.
* HTTP 429 errors.
* HTTP 5XX errors.

`prometheus.remote.queue` will not retry sending data if any other unsuccessful status codes are returned.
`prometheus.write.queue` will not retry sending data if any other unsuccessful status codes are returned.

### Memory

`prometheus.remote.queue` is meant to be memory efficient.
`prometheus.write.queue` is meant to be memory efficient.
You can adjust the `max_signals_to_batch`, `queue_count`, and `batch_size` to control how much memory is used.
A higher `max_signals_to_batch` allows for more efficient disk compression.
A higher `queue_count` allows more concurrent writes, and `batch_size` allows more data sent at one time.
Expand All @@ -264,7 +264,7 @@ The defaults are suitable for most common usages.

## Compatible components

`prometheus.remote.queue` has exports that can be consumed by the following components:
`prometheus.write.queue` has exports that can be consumed by the following components:

- Components that consume [Prometheus `MetricsReceiver`](../../../compatibility/#prometheus-metricsreceiver-consumers)

Expand Down
2 changes: 1 addition & 1 deletion internal/component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ import (
_ "github.com/grafana/alloy/internal/component/prometheus/operator/servicemonitors" // Import prometheus.operator.servicemonitors
_ "github.com/grafana/alloy/internal/component/prometheus/receive_http" // Import prometheus.receive_http
_ "github.com/grafana/alloy/internal/component/prometheus/relabel" // Import prometheus.relabel
_ "github.com/grafana/alloy/internal/component/prometheus/remote/queue" // Import prometheus.remote.queue
_ "github.com/grafana/alloy/internal/component/prometheus/remote/queue" // Import prometheus.write.queue
_ "github.com/grafana/alloy/internal/component/prometheus/remotewrite" // Import prometheus.remote_write
_ "github.com/grafana/alloy/internal/component/prometheus/scrape" // Import prometheus.scrape
_ "github.com/grafana/alloy/internal/component/pyroscope/ebpf" // Import pyroscope.ebpf
Expand Down
4 changes: 2 additions & 2 deletions internal/component/prometheus/remote/queue/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

## Overview

The `prometheus.remote.queue` goals are to set reliable and repeatable memory and cpu usage based on the number of incoming and outgoing series. There are four broad parts to the system.
The `prometheus.write.queue` goals are to set reliable and repeatable memory and cpu usage based on the number of incoming and outgoing series. There are four broad parts to the system.

1. The `prometheus.remote.queue` component itself. This handles the lifecycle of the Alloy system.
1. The `prometheus.write.queue` component itself. This handles the lifecycle of the Alloy system.
2. The `serialization` converts an array of series into a serializable format. This is handled via [msgp]() library.
3. The `filequeue` is where the buffers are written to. This has a series of files that are committed to disk and then are read.
4. The `network` handles sending data. The data is sharded by the label hash across any number of loops that send data.
Expand Down
13 changes: 8 additions & 5 deletions internal/component/prometheus/remote/queue/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

func init() {
component.Register(component.Registration{
Name: "prometheus.remote.queue",
Name: "prometheus.write.queue",
Args: Arguments{},
Exports: Exports{},
Stability: featuregate.StabilityExperimental,
Expand All @@ -42,9 +42,6 @@ func NewComponent(opts component.Options, args Arguments) (*Queue, error) {
return nil, err
}

for _, ep := range s.endpoints {
ep.Start()
}
return s, nil
}

Expand All @@ -62,6 +59,12 @@ type Queue struct {
// suffers a fatal error. Run is guaranteed to be called exactly once per
// Component.
func (s *Queue) Run(ctx context.Context) error {
s.mut.Lock()
for _, ep := range s.endpoints {
ep.Start()
}
s.mut.Unlock()

defer func() {
s.mut.Lock()
defer s.mut.Unlock()
Expand Down Expand Up @@ -135,7 +138,7 @@ func (s *Queue) createEndpoints() error {
}
serial, err := serialization.NewSerializer(types.SerializerConfig{
MaxSignalsInBatch: uint32(s.args.Serialization.MaxSignalsToBatch),
FlushFrequency: s.args.Serialization.BatchFrequency,
FlushFrequency: s.args.Serialization.BatchInterval,
}, fq, stats.UpdateSerializer, s.opts.Logger)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions internal/component/prometheus/remote/queue/e2e_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func newComponentBenchmark(t *testing.B, l log.Logger, url string, exp chan Expo
TTL: 2 * time.Hour,
Serialization: Serialization{
MaxSignalsToBatch: 100_000,
BatchFrequency: 1 * time.Second,
BatchInterval: 1 * time.Second,
},
Endpoints: []EndpointConfig{{
Name: "test",
Expand All @@ -104,7 +104,7 @@ func newComponentBenchmark(t *testing.B, l log.Logger, url string, exp chan Expo
RetryBackoff: 1 * time.Second,
MaxRetryAttempts: 0,
BatchCount: 50,
FlushFrequency: 1 * time.Second,
FlushInterval: 1 * time.Second,
Parallelism: 1,
}},
})
Expand Down
4 changes: 2 additions & 2 deletions internal/component/prometheus/remote/queue/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func newComponent(t *testing.T, l *logging.Logger, url string, exp chan Exports,
TTL: 2 * time.Hour,
Serialization: Serialization{
MaxSignalsToBatch: 10_000,
BatchFrequency: 1 * time.Second,
BatchInterval: 1 * time.Second,
},
Endpoints: []EndpointConfig{{
Name: "test",
Expand All @@ -366,7 +366,7 @@ func newComponent(t *testing.T, l *logging.Logger, url string, exp chan Exports,
RetryBackoff: 5 * time.Second,
MaxRetryAttempts: 1,
BatchCount: 50,
FlushFrequency: 1 * time.Second,
FlushInterval: 1 * time.Second,
Parallelism: 1,
}},
})
Expand Down
2 changes: 1 addition & 1 deletion internal/component/prometheus/remote/queue/network/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (l *loop) DoWork(ctx actor.Context) actor.WorkerStatus {
if len(l.series) == 0 {
return actor.WorkerContinue
}
if time.Since(l.lastSend) > l.cfg.FlushFrequency {
if time.Since(l.lastSend) > l.cfg.FlushInterval {
l.trySend(ctx)
}
return actor.WorkerContinue
Expand Down
48 changes: 24 additions & 24 deletions internal/component/prometheus/remote/queue/network/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ func TestSending(t *testing.T) {
defer cncl()

cc := types.ConnectionConfig{
URL: svr.URL,
Timeout: 1 * time.Second,
BatchCount: 10,
FlushFrequency: 1 * time.Second,
Connections: 4,
URL: svr.URL,
Timeout: 1 * time.Second,
BatchCount: 10,
FlushInterval: 1 * time.Second,
Connections: 4,
}

logger := log.NewNopLogger()
Expand Down Expand Up @@ -67,11 +67,11 @@ func TestUpdatingConfig(t *testing.T) {
defer svr.Close()

cc := types.ConnectionConfig{
URL: svr.URL,
Timeout: 1 * time.Second,
BatchCount: 10,
FlushFrequency: 5 * time.Second,
Connections: 1,
URL: svr.URL,
Timeout: 1 * time.Second,
BatchCount: 10,
FlushInterval: 5 * time.Second,
Connections: 1,
}

logger := util.TestAlloyLogger(t)
Expand All @@ -82,11 +82,11 @@ func TestUpdatingConfig(t *testing.T) {
defer wr.Stop()

cc2 := types.ConnectionConfig{
URL: svr.URL,
Timeout: 1 * time.Second,
BatchCount: 20,
FlushFrequency: 5 * time.Second,
Connections: 1,
URL: svr.URL,
Timeout: 1 * time.Second,
BatchCount: 20,
FlushInterval: 5 * time.Second,
Connections: 1,
}
ctx := context.Background()
err = wr.UpdateConfig(ctx, cc2)
Expand Down Expand Up @@ -122,12 +122,12 @@ func TestRetry(t *testing.T) {
defer cncl()

cc := types.ConnectionConfig{
URL: svr.URL,
Timeout: 1 * time.Second,
BatchCount: 1,
FlushFrequency: 1 * time.Second,
RetryBackoff: 100 * time.Millisecond,
Connections: 1,
URL: svr.URL,
Timeout: 1 * time.Second,
BatchCount: 1,
FlushInterval: 1 * time.Second,
RetryBackoff: 100 * time.Millisecond,
Connections: 1,
}

logger := log.NewNopLogger()
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestRetryBounded(t *testing.T) {
URL: svr.URL,
Timeout: 1 * time.Second,
BatchCount: 1,
FlushFrequency: 1 * time.Second,
FlushInterval: 1 * time.Second,
RetryBackoff: 100 * time.Millisecond,
MaxRetryAttempts: 1,
Connections: 1,
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestRecoverable(t *testing.T) {
URL: svr.URL,
Timeout: 1 * time.Second,
BatchCount: 1,
FlushFrequency: 1 * time.Second,
FlushInterval: 1 * time.Second,
RetryBackoff: 100 * time.Millisecond,
MaxRetryAttempts: 1,
Connections: 1,
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestNonRecoverable(t *testing.T) {
URL: svr.URL,
Timeout: 1 * time.Second,
BatchCount: 1,
FlushFrequency: 1 * time.Second,
FlushInterval: 1 * time.Second,
RetryBackoff: 100 * time.Millisecond,
MaxRetryAttempts: 1,
Connections: 1,
Expand Down
Loading

0 comments on commit 57d81b0

Please sign in to comment.