Take Grafana/Vector passed log messages from NATS JetStream, match category and create new subjects.
Setup HOWTO:
Make sure there is a JetStream stream to read from. Creating a stream with a subjects set to the Vector subject will make it collect the published message automatically:
nats stream add --subjects=default.nats.vector \ --description='All messages from vector' \ --storage=file --replicas=3 \ --retention=limits --discard=old --max-bytes=20GiB --max-msgs=-1 \ --max-msgs-per-subject=-1 --max-age=-1 --max-msg-size=-1 \ --dupe-window=60s --no-allow-rollup --no-deny-delete \ --no-deny-purge --allow-direct bulk_unfiltered
Create a consumer which we'll use to read from:
nats consumer add --pull --deliver=all --ack=explicit \ --replay=instant --filter= --max-deliver=-1 --max-pending=5000 \ --no-headers-only --backoff=none \ bulk_unfiltered natsomatch_unfiltered
max-pending defines the max amount of unacked messages that can be pending at the same time. If you set dev.max_messages_per_batch on the input, you'll need max-pending at or above that number times the amount of natsomatch workers.
Create a bunch of streams to write to, one for every possible matched subject. Also create a consumer for test purposes while we're at it:
MATCHES=$(cat lib/match/src/log_matcher.rs | grep -FA2 'Ok(Match' | sed -e '/subject: /!d;s/.*format!("bulk[.]//;s/[.].*//' | sort) for match in $MATCHES; do nats stream add --subjects="bulk.$match.>" \ --description="Bulk $match" --storage=file --replicas=3 \ --retention=limits --discard=old --max-bytes=2GiB \ --max-msgs=-1 --max-msgs-per-subject=-1 \ --max-age=-1 --max-msg-size=-1 --dupe-window=60s --no-allow-rollup \ --no-deny-delete --no-deny-purge --allow-direct "bulk_match_${match}" nats consumer add --pull --deliver=all --ack=explicit \ --replay=instant --filter= --max-deliver=-1 --max-pending=0 \ --no-headers-only --backoff=none \ "bulk_match_${match}" "bulk_match_${match}_consumer" done
Next, we'll need a couple of these running. Could be more or less depending on how fast consumption rates are. In
doc/
there's a Kubernetes setup.Configuration can be seen below. If you're using Kubernetes, configure it in the ConfigMap.
The
/healthz
health check and stats reporting is listening on port 3000. The Kubernetes container uses this to check liveness.
Configuration for the Rust version:
[input]
# Input source
nats.server = 'nats://10.20.30.40:4222'
# Server certificate validation (tls.server_name is not working)
tls.server_name = 'nats.local'
tls.ca_file = './nats_ca.crt'
# Client certificate
tls.cert_file = './nats_client.crt'
tls.key_file = './nats_client.key'
# Select manually made consumer from stream
nats.stream = 'bulk_unfiltered'
nats.consumer = 'natsomatch_unfiltered'
[sink]
# Output target
nats.server = 'nats://nats.example.com:4222'
nats.auth = { username = 'derek', password = 's3cr3t!' }
# Server certificate validation (tls.server_name is not working)
tls.ca_file = '/etc/ssl/certs/ca-certificates.crt'
# Client certificate
#tls.cert_file = './nats_client.crt'
#tls.key_file = './nats_client.key'
# No need to set jetstream name or subjects. The subject generation is
# hardcoded for now, based on the message.
☐ Clear (greppable) log message on startup. Clear log message on shutdown.
☐ Hardcoded attributes are now in lib/json/src/payload_parser.rs. Maybe make them configurable.
☐ Hardcoded matching rules are now in lib/match/src/log_matcher.rs. Maybe make them configurable.
☐ See if we can add filters to remove useless messages. We'll want to check some live data here.
☐ Add configurable bind address for /healthz server. Use a ping/pong test on input/sink too?
☐ See if we want to rely on ghcr.io/rust-cross/rust-musl-cross ( https://github.com/rust-cross/rust-musl-cross ) or want to build something from the official images.
☐ See if we want to use cargo-chef for docker layer caching (speeding up release builds).
☐ Stats improvements:
- Count average message length.
- Report stats on output subscriptions (streams) so we can reorder filters for more speed.
☐ Monitoring improvements:
- Right now we have no easy detection of streams that are not handled quickly enough. Maybe check natsomatch_unfiltered for "unprocessed" counts.
☐ Check and fix behaviour on NATS/JetStream disconnect/error. Consider auto-creating streams. (Where are the settings?)
The git describe
version is stored and shown on bad arguments:
$ ./target/release/natsomatch -v
natsomatch v0.1.0
Usage: ./target/release/natsomatch -c <config-file>
The built binary (if built using cargo auditable build
) includes a
Software Bill of Materials (SBOM):
$ objcopy --dump-section .dep-v0=/dev/stdout target/release/natsomatch |
python3 -c 'import zlib,sys;print(zlib.decompress(sys.stdin.buffer.read()).decode("utf-8"))' |
jq .
{
"packages": [
{
"name": "aho-corasick",
"version": "1.1.2",
"source": "crates.io",
"dependencies": [
45
]
},
{
"name": "async-nats",
"version": "0.33.0",
"source": "crates.io",
"dependencies": [
3,
...
String vs. Box<str>
: don't useBox<str>
to make the string immutable or try to save a uint. Only use it if you have many many strings. (Similarly: seeBox<[T]>
vs.Vec<T>
.)into/to_string/to_owned
:to_string
is to get a human representation of something;to_owned
is for converting a&String
(or maybe a&str
) to a copy/clone;into
is for conversion (String
toPathBuf
,&str
toString
).