Skip to content

Commit

Permalink
fix consumer to take topic from env
Browse files Browse the repository at this point in the history
  • Loading branch information
MaheshRKumawat authored and qu1queee committed Mar 12, 2024
1 parent 33e2570 commit 8899414
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 7 deletions.
8 changes: 3 additions & 5 deletions kafka-observer/cmd/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
Expand All @@ -27,8 +28,6 @@ var idleTimer *time.Timer
// interface
type Consumer struct {
ready chan bool
// ce *codeenginev2.CodeEngineV2
topicToJD map[string]cmd.TopicsToJobs
}

func main() {
Expand Down Expand Up @@ -66,11 +65,10 @@ func main() {

consumer := Consumer{
ready: make(chan bool),
// ce: ceService,
topicToJD: config.KafkaCE,
}

brokers := config.Kafka.Brokers
topics := strings.Split(os.Getenv("KAFKA_TOPIC"), ",")

ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(brokers, "consuming-group", saramaConfig)
Expand All @@ -84,7 +82,7 @@ func main() {
defer wg.Done()
for {
// TODO: fix topics, cannot use all
if err := client.Consume(ctx, config.Kafka.Topics, &consumer); err != nil {
if err := client.Consume(ctx, topics, &consumer); err != nil {
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
return
}
Expand Down
4 changes: 2 additions & 2 deletions kafka-observer/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ echo "[INFO] Going to create configmap ${CM_TOPICJOBS_NAME}"
ibmcloud ce configmap create --name $CM_TOPICJOBS_NAME --from-file ${BASEDIR}/resources/kafkadata

echo "[INFO] Going to create JobDefinition ${CONSUMER_JOB_NAME_1}"
ibmcloud ce job create --name $CONSUMER_JOB_NAME_1 --mode daemon --build-source ${BASEDIR} --build-dockerfile ${BASEDIR}/Dockerfile.consumer --env-from-secret $AUTH_SECRETS_NAME --env-from-configmap $CM_TOPICJOBS_NAME --env CONSUMER_GROUP=payment-consumer-group --env CE_REMOVE_COMPLETED_JOBS=IMMEDIATELY --wait
ibmcloud ce job create --name $CONSUMER_JOB_NAME_1 --mode daemon --build-source ${BASEDIR} --build-dockerfile ${BASEDIR}/Dockerfile.consumer --env-from-secret $AUTH_SECRETS_NAME --env-from-configmap $CM_TOPICJOBS_NAME --env CONSUMER_GROUP=payment-consumer-group --env KAFKA_TOPIC=payments --env CE_REMOVE_COMPLETED_JOBS=IMMEDIATELY --wait

echo "[INFO] Going to create JobDefinition ${CONSUMER_JOB_NAME_2}"
ibmcloud ce job create --name $CONSUMER_JOB_NAME_2 --mode daemon --build-source ${BASEDIR} --build-dockerfile ${BASEDIR}/Dockerfile.consumer --env-from-secret $AUTH_SECRETS_NAME --env-from-configmap $CM_TOPICJOBS_NAME --env CONSUMER_GROUP=shipping-consumer-group --env CE_REMOVE_COMPLETED_JOBS=IMMEDIATELY --wait
ibmcloud ce job create --name $CONSUMER_JOB_NAME_2 --mode daemon --build-source ${BASEDIR} --build-dockerfile ${BASEDIR}/Dockerfile.consumer --env-from-secret $AUTH_SECRETS_NAME --env-from-configmap $CM_TOPICJOBS_NAME --env CONSUMER_GROUP=shipping-consumer-group --env KAFKA_TOPIC=shipping --env CE_REMOVE_COMPLETED_JOBS=IMMEDIATELY --wait

echo "[INFO] Going to create JobDefinition ${OBSERVER_JOB_NAME}"
ibmcloud ce job create --name $OBSERVER_JOB_NAME --mode daemon --build-source ${BASEDIR} --build-dockerfile ${BASEDIR}/Dockerfile.observer --env-from-secret $AUTH_SECRETS_NAME --env-from-configmap $CM_TOPICJOBS_NAME --wait
Expand Down

0 comments on commit 8899414

Please sign in to comment.