diff --git a/.env.dist b/.env.dist index 4be37f44..c35e751f 100644 --- a/.env.dist +++ b/.env.dist @@ -1,6 +1,6 @@ -KAFKA_MONGO_WATCHER_REPLAY=true +REPLAY=true -KAFKA_MONGO_WATCHER_PRINT_CONFIG=true -KAFKA_MONGO_WATCHER_LOG_CLI_VERBOSE=false -KAFKA_MONGO_WATCHER_LOG_LEVEL=DEBUG -KAFKA_MONGO_WATCHER_PRODUCER_POOL_SIZE=1 +PRINT_CONFIG=true +LOG_CLI_VERBOSE=false +LOG_LEVEL=DEBUG +PRODUCER_POOL_SIZE=1 diff --git a/README.md b/README.md index adb12a27..e887b8c4 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ You can run it using the following example and pass configuration environment va ```bash $ docker run \ - -e 'KAFKA_MONGO_WATCHER_REPLAY=true' \ + -e 'REPLAY=true' \ etf1/kafka-mongo-watcher:latest ``` @@ -62,7 +62,7 @@ In order to run the watcher, type the following command with the desired argumen You can use flags (as in this example) or environment variables: ```bash -$ ./kafka-mongo-watcher -KAFKA_MONGO_WATCHER_REPLAY=true +$ ./kafka-mongo-watcher -REPLAY=true ... HTTP server started {"facility":"kafka-mongo-watcher","version":"wip","addr":":8001","file":"/usr/local/Cellar/go/1.14/libexec/src/runtime/asm_amd64.s","line":1373} Connected to mongodb database {"facility":"kafka-mongo-watcher","version":"wip","uri":"mongodb://root:toor@127.0.0.1:27011,127.0.0.1:27012,127.0.0.1:27013/watcher?replicaSet=replicaset\u0026authSource=admin"} @@ -77,148 +77,157 @@ In dev environment you can copy `.env.dist` in `.env` and edit his content in or You can set/override configuration variables from `.env` file and from `variables environment` and or from cli arguments (If a variables was configured in multiple sources the last will override the previous one) -#### KAFKA_MONGO_WATCHER_CUSTOM_PIPELINE +Configuration variables with prefix are first loaded and then without prefix. For example if you define `KAFKA_MONGO_WATCHER_MONGODB_URI=xxxx` it will used for the mongo uri, even if `MONGODB_URI=yyyy` is set. This allows some overriding case, sometimes useful inside kubernetes cluster. + +#### KAFKA_MONGO_WATCHER_PREFIX *Type*: string -*Description*: In case you want to specify a filtering pipeline, you can specify it here. It works both wil replay and watch mode. +*Description*: In case you want to specify a different prefix (not `KAFKA_MONGO_WATCHER`) for all configuration environment variables. -*Example value*: `[ { "$match": { "fullDocument.is_active": true } }, { $addFields: { "custom-field": "custom-value" } } ]` +*Example value*: `KAFKA_MONGO_WATCHER_PREFIX=CUSTOM` in this case -**Hint**: You can also use some built-in variables such as `%currentTimestamp%` that will put the current timestamp value right in the aggregation pipeline. +#### CUSTOM_PIPELINE +*Type*: string -*Example value with variables*: `[ { "$match": { "date": { "$gt": { "$date": { "$numberLong": "%currentTimestamp%" } } } } } ]` +*Description*: In case you want to specify a filtering pipeline, you can specify it here. It works both wil replay and watch mode. -#### KAFKA_MONGO_WATCHER_REPLAY +*Example value*: `[ { "$match": { "fullDocument.is_active": true } }, { $addFields: { "custom-field": "custom-value" } } ]` + +#### REPLAY *Type*: bool *Description*: In case you want to send all collection's documents once (default: false) -#### KAFKA_MONGO_WATCHER_MONGODB_URI +**Hint**: You can also use some built-in variables such as `%currentTimestamp%` that will put the current timestamp value right in the aggregation pipeline. + +*Example value with variables*: `[ { "$match": { "date": { "$gt": { "$date": { "$numberLong": "%currentTimestamp%" } } } } } ]` + +#### MONGODB_URI *Type*: string *Description*: The MongoDB connection string URI (default: mongodb://root:toor@127.0.0.1:27011,...) -#### KAFKA_MONGO_WATCHER_MONGODB_COLLECTION_NAME +#### MONGODB_COLLECTION_NAME *Type*: string *Description*: The MongoDB collection you want to watch (default: "items") -#### KAFKA_MONGO_WATCHER_MONGODB_DATABASE_NAME +#### MONGODB_DATABASE_NAME *Type*: string *Description*: The MongoDB database name you want to connect to (default: "watcher") -#### KAFKA_MONGO_WATCHER_MONGODB_SERVER_SELECTION_TIMEOUT +#### MONGODB_SERVER_SELECTION_TIMEOUT *Type*: duration *Description*: The MongoDB server selection timeout duration (default: 2s) -#### KAFKA_MONGO_WATCHER_MONGODB_OPTION_BATCH_SIZE +#### MONGODB_OPTION_BATCH_SIZE *Type*: integer *Description*: In case you want to enable watch batch size on MongoDB watch (default: 0 / no batch) -#### KAFKA_MONGO_WATCHER_MONGODB_OPTION_FULL_DOCUMENT +#### MONGODB_OPTION_FULL_DOCUMENT *Type*: boolean *Description*: In case you want to retrieve the full document when watching for oplogs (default: true) -#### KAFKA_MONGO_WATCHER_MONGODB_OPTION_MAX_AWAIT_TIME +#### MONGODB_OPTION_MAX_AWAIT_TIME *Type*: duration *Description*: In case you want to set a maximum value awaiting for new oplogs (default: 0 / don't stop) -#### KAFKA_MONGO_WATCHER_MONGODB_OPTION_RESUME_AFTER +#### MONGODB_OPTION_RESUME_AFTER *Type*: string *Description*: In case you want to set a logical starting point for the change stream (example : `{"_data": }`) -#### KAFKA_MONGO_WATCHER_MONGODB_OPTION_START_AT_OPERATION_TIME_I +#### MONGODB_OPTION_START_AT_OPERATION_TIME_I *Type*: uint32 *(increment value)* -#### KAFKA_MONGO_WATCHER_MONGODB_OPTION_START_AT_OPERATION_TIME_T +#### MONGODB_OPTION_START_AT_OPERATION_TIME_T *Type*: uint32 *(timestamp)* *Description*: In case you want to set a timestamp for the change stream to only return changes that occurred at or after the given timestamp (default: nil) -#### KAFKA_MONGO_WATCHER_MONGODB_OPTION_WATCH_MAX_RETRIES +#### MONGODB_OPTION_WATCH_MAX_RETRIES *Type*: integer *Description*: The max number of retries when trying to watch a collection (default: 3, set to 0 to disable retry) -#### KAFKA_MONGO_WATCHER_MONGODB_OPTION_WATCH_RETRY_DELAY +#### MONGODB_OPTION_WATCH_RETRY_DELAY *Type*: duration *Description*: Sleeping delay between two watch attempts (default: 500ms) -#### KAFKA_MONGO_WATCHER_KAFKA_BOOTSTRAP_SERVERS +#### KAFKA_BOOTSTRAP_SERVERS *Type*: string *Description*: Kafka bootstrap servers list (default: "127.0.0.1:9092") -#### KAFKA_MONGO_WATCHER_KAFKA_TOPIC +#### KAFKA_TOPIC *Type*: string *Description*: Kafka topic to write into (default: "kafka-mongo-watcher") -#### KAFKA_MONGO_WATCHER_KAFKA_PRODUCE_CHANNEL_SIZE +#### KAFKA_PRODUCE_CHANNEL_SIZE *Type*: integer *Description*: The maximum size of the internal channel producer size (default: 10000) A big value here can increase the heap memory of the application as all the payload that have to be sent to Kafka will be maintained in channel. -#### KAFKA_MONGO_WATCHER_LOG_CLI_VERBOSE +#### LOG_CLI_VERBOSE *Type*: boolean *Description*: Used to enable/disable log verbosity (default: true) -#### KAFKA_MONGO_WATCHER_LOG_LEVEL +#### LOG_LEVEL *Type*: string *Description*: Used to define first level you want to start display logs (default: "info") -#### KAFKA_MONGO_WATCHER_GRAYLOG_ENDPOINT +#### GRAYLOG_ENDPOINT *Type*: string *Description*: In case you want to push logs into a Graylog server, just fill this entry with the endpoint -#### KAFKA_MONGO_WATCHER_HTTP_IDLE_TIMEOUT +#### HTTP_IDLE_TIMEOUT *Type*: duration *Description*: A idle timeout for HTTP technical server (default: 90s) -#### KAFKA_MONGO_WATCHER_HTTP_READ_HEADER_TIMEOUT +#### HTTP_READ_HEADER_TIMEOUT *Type*: duration *Description*: A read timeout for HTTP technical server (default: 1s) -#### KAFKA_MONGO_WATCHER_HTTP_WRITE_TIMEOUT +#### HTTP_WRITE_TIMEOUT *Type*: duration *Description*: A write timeout for HTTP technical server (default: 10s) -#### KAFKA_MONGO_WATCHER_HTTP_TECH_ADDR +#### HTTP_TECH_ADDR *Type*: string *Description*: A specified address for HTTP technical server to listen (default: ":8001") -#### KAFKA_MONGO_WATCHER_PRINT_CONFIG +#### PRINT_CONFIG *Type*: boolean *Description*: Used to enable/disable the configuration print at startup (default: true) -#### KAFKA_MONGO_WATCHER_PPROF_ENABLED +#### PPROF_ENABLED *Type*: boolean *Description*: In case you want to enable Go pprof debugging (default: true). No impact when not used -#### OTEL_COLLECTOR_ENDPOINT +#### OPEN_TELEMETRY_COLLECTOR_ENDPOINT *Type*: string *Description*: In case you want to enable OpenTelemetry tracing, fill this with the : of your collector endpoint -#### OTEL_SAMPLE_RATIO +#### OPEN_TELEMETRY_SAMPLE_RATIO *Type*: float64 *Description*: A fraction between 0 and 1 to enable sampling OpenTelemetry traces diff --git a/cmd/watcher/main.go b/cmd/watcher/main.go index d8eafae1..e6c32480 100644 --- a/cmd/watcher/main.go +++ b/cmd/watcher/main.go @@ -11,11 +11,16 @@ import ( signal_subscriber "github.com/gol4ng/signal" ) -const ( +var ( configPrefix = "kafka_mongo_watcher" ) func main() { + + if prefixFromEnv := os.Getenv("KAFKA_MONGO_WATCHER_PREFIX"); prefixFromEnv != "" { + configPrefix = prefixFromEnv + } + ctx, cancel := context.WithCancel(context.Background()) cfg := config.NewBase(ctx, configPrefix) diff --git a/cmd/watcher/main_test.go b/cmd/watcher/main_test.go index e3676d92..cb70df35 100644 --- a/cmd/watcher/main_test.go +++ b/cmd/watcher/main_test.go @@ -30,7 +30,7 @@ var cfg *config.Base func setupConfig(ctx context.Context) { if cfg == nil { - os.Setenv("KAFKA_MONGO_WATCHER_PRINT_CONFIG", "false") + os.Setenv("PRINT_CONFIG", "false") cfg = config.NewBase(ctx, configPrefix) cfg.LogCliVerbose = false diff --git a/config/config.go b/config/config.go index 442a58ed..8886aa12 100644 --- a/config/config.go +++ b/config/config.go @@ -25,8 +25,8 @@ type Base struct { GraylogEndpoint string `config:"GRAYLOG_ENDPOINT"` Replay bool `config:"REPLAY"` CustomPipeline string `config:"CUSTOM_PIPELINE"` - OtelCollectorEndpoint string `config:"OTEL_COLLECTOR_ENDPOINT"` - OtelSampleRatio float64 `config:"OTEL_SAMPLE_RATIO"` + OtelCollectorEndpoint string `config:"OPEN_TELEMETRY_COLLECTOR_ENDPOINT"` + OtelSampleRatio float64 `config:"OPEN_TELEMETRY_SAMPLE_RATIO"` PprofEnabled bool `config:"PPROF_ENABLED"` HttpServer