From c89841af3db47ecb7895112e2f08220acdbffd8f Mon Sep 17 00:00:00 2001 From: MaheshRKumawat Date: Mon, 11 Mar 2024 18:33:23 +0530 Subject: [PATCH] fix consumer to take topic from env Signed-off-by: encalada --- kafka-observer/cmd/consumer/main.go | 8 +++----- kafka-observer/run.sh | 4 ++-- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/kafka-observer/cmd/consumer/main.go b/kafka-observer/cmd/consumer/main.go index 0da53639..23ac43ef 100644 --- a/kafka-observer/cmd/consumer/main.go +++ b/kafka-observer/cmd/consumer/main.go @@ -7,6 +7,7 @@ import ( "log" "os" "os/signal" + "strings" "sync" "syscall" "time" @@ -27,8 +28,6 @@ var idleTimer *time.Timer // interface type Consumer struct { ready chan bool - // ce *codeenginev2.CodeEngineV2 - topicToJD map[string]cmd.TopicsToJobs } func main() { @@ -66,11 +65,10 @@ func main() { consumer := Consumer{ ready: make(chan bool), - // ce: ceService, - topicToJD: config.KafkaCE, } brokers := config.Kafka.Brokers + topics := strings.Split(os.Getenv("KAFKA_TOPIC"), ",") ctx, cancel := context.WithCancel(context.Background()) client, err := sarama.NewConsumerGroup(brokers, "consuming-group", saramaConfig) @@ -84,7 +82,7 @@ func main() { defer wg.Done() for { // TODO: fix topics, cannot use all - if err := client.Consume(ctx, config.Kafka.Topics, &consumer); err != nil { + if err := client.Consume(ctx, topics, &consumer); err != nil { if errors.Is(err, sarama.ErrClosedConsumerGroup) { return } diff --git a/kafka-observer/run.sh b/kafka-observer/run.sh index 64fc9494..324ce95c 100755 --- a/kafka-observer/run.sh +++ b/kafka-observer/run.sh @@ -62,10 +62,10 @@ echo "[INFO] Going to create configmap ${CM_TOPICJOBS_NAME}" ibmcloud ce configmap create --name $CM_TOPICJOBS_NAME --from-file ${BASEDIR}/resources/kafkadata echo "[INFO] Going to create JobDefinition ${CONSUMER_JOB_NAME_1}" -ibmcloud ce job create --name $CONSUMER_JOB_NAME_1 --mode daemon --build-source ${BASEDIR} --build-dockerfile ${BASEDIR}/Dockerfile.consumer --env-from-secret $AUTH_SECRETS_NAME --env-from-configmap $CM_TOPICJOBS_NAME --env CONSUMER_GROUP=payment-consumer-group --env CE_REMOVE_COMPLETED_JOBS=IMMEDIATELY --wait +ibmcloud ce job create --name $CONSUMER_JOB_NAME_1 --mode daemon --build-source ${BASEDIR} --build-dockerfile ${BASEDIR}/Dockerfile.consumer --env-from-secret $AUTH_SECRETS_NAME --env-from-configmap $CM_TOPICJOBS_NAME --env CONSUMER_GROUP=payment-consumer-group --env KAFKA_TOPIC=payments --env CE_REMOVE_COMPLETED_JOBS=IMMEDIATELY --wait echo "[INFO] Going to create JobDefinition ${CONSUMER_JOB_NAME_2}" -ibmcloud ce job create --name $CONSUMER_JOB_NAME_2 --mode daemon --build-source ${BASEDIR} --build-dockerfile ${BASEDIR}/Dockerfile.consumer --env-from-secret $AUTH_SECRETS_NAME --env-from-configmap $CM_TOPICJOBS_NAME --env CONSUMER_GROUP=shipping-consumer-group --env CE_REMOVE_COMPLETED_JOBS=IMMEDIATELY --wait +ibmcloud ce job create --name $CONSUMER_JOB_NAME_2 --mode daemon --build-source ${BASEDIR} --build-dockerfile ${BASEDIR}/Dockerfile.consumer --env-from-secret $AUTH_SECRETS_NAME --env-from-configmap $CM_TOPICJOBS_NAME --env CONSUMER_GROUP=shipping-consumer-group --env KAFKA_TOPIC=shipping --env CE_REMOVE_COMPLETED_JOBS=IMMEDIATELY --wait echo "[INFO] Going to create JobDefinition ${OBSERVER_JOB_NAME}" ibmcloud ce job create --name $OBSERVER_JOB_NAME --mode daemon --build-source ${BASEDIR} --build-dockerfile ${BASEDIR}/Dockerfile.observer --env-from-secret $AUTH_SECRETS_NAME --env-from-configmap $CM_TOPICJOBS_NAME --wait