diff --git a/vector/replay-capture/vector.yaml b/vector/replay-capture/vector.yaml index 6428d2c2b9890..8bcc51a10676f 100644 --- a/vector/replay-capture/vector.yaml +++ b/vector/replay-capture/vector.yaml @@ -1,9 +1,11 @@ acknowledgements: enabled: true + api: enabled: true address: 0.0.0.0:8686 playground: true + enrichment_tables: quota_limited_teams: type: file @@ -21,6 +23,7 @@ enrichment_tables: type: csv schema: token: string + sources: capture_server: type: http_server @@ -58,6 +61,7 @@ sources: assert!(is_string(.message[0].distinct_id), "distinct_id is required") assert!(is_string(.message[0].properties."$$session_id"), "$$session_id is required") assert!(is_string(%token), "token is required") + transforms: quota_check: type: route @@ -69,6 +73,7 @@ transforms: source: | _, err = get_enrichment_table_record("quota_limited_teams", { "token": %token }) err == null # err is not null if row not found, we want to drop where the row _is_ found + events_parsed: type: remap inputs: @@ -119,6 +124,7 @@ transforms: %headers = { "token": .token } + overflow_check: type: route inputs: @@ -129,6 +135,7 @@ transforms: source: | _, err = get_enrichment_table_record("overflow_sessions", { "session_id": .session_id }) err == null # err is not null if row not found, we want to drop where the row _is_ found + log_errors: type: remap inputs: @@ -141,6 +148,7 @@ transforms: "session_id": .message[0].properties."$$session_id", "distinct_id": .message[0].distinct_id }, rate_limit_secs: 0) + metric_quota_dropped: type: log_to_metric inputs: @@ -164,14 +172,18 @@ sinks: enabled: true inputs: - log_errors + dropped: type: blackhole acknowledgements: enabled: true inputs: - metric_quota_dropped - kafka: + + kafka: &kafka type: kafka + acknowledgements: + enabled: true inputs: - overflow_check._unmatched buffer: @@ -203,34 +215,7 @@ sinks: message_timeout_ms: 10000 socket_timeout_ms: 5000 kafka_overflow: - type: kafka - buffer: - - type: memory - max_events: 10000 - when_full: block - bootstrap_servers: $KAFKA_BOOSTRAP_SERVERS - compression: gzip - key_field: .session_id - headers_key: '%headers' - tls: - enabled: false - encoding: - codec: json - librdkafka_options: - client.id: ${CLIENT_ID:-$HOSTNAME} - linger.ms: '0' - topic.metadata.refresh.interval.ms: '20000' - queue.buffering.max.kbytes: '1048576' - queue.buffering.max.messages: '100' - message.max.bytes: '64000000' - batch.size: '1600000' - batch.num.messages: '100' - sticky.partitioning.linger.ms: '25' - enable.idempotence: 'false' - max.in.flight.requests.per.connection: '1000000' - partitioner: 'consistent_random' - message_timeout_ms: 10000 - socket_timeout_ms: 5000 + <<: *kafka inputs: - overflow_check.overflow topic: $KAFKA_OVERFLOW_TOPIC