Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Observer Sample Pattern #80

Merged
merged 11 commits into from
Apr 22, 2024
Merged

Conversation

qu1queee
Copy link
Member

This introduces a Kafka Observer Pattern for consumes a stream of messages from Kafka.

A so called Observer entity is provided, this Observer has the task to consume messages from multiple Kafka Topics and wake-up the so called Consumers whenever new messages arrived.

Consumers are the second entity in this sample. They will consume messages from a particular Kafka Topic after being wake-up by the Observer. Note that users might bring their own Consumer Kafka Clients code. In this example, a Consumer client written in go is provided. Consumers will gracefully shut-down after one minute, if no new messages are coming through the stream.

A comprehensive README is provided, on how to use this sample.

@qu1queee qu1queee requested a review from reggeenr March 12, 2024 07:14
@qu1queee qu1queee force-pushed the qu1queee/kafka_observer branch 3 times, most recently from b433c75 to c89841a Compare March 12, 2024 07:23
Copy link
Collaborator

@reggeenr reggeenr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi team, great job. Based on the on the observer pattern I was able to cut down costs in my sample scenarios by roughly 50% while still staying the acceptable performance ranges for the consumption latency.

Based on the extensive tests that I ran, I suggest a few tweaks and changes in the observer and consumer implementation. Besides some performance tweaks, I would like to revisit the solution design w.r.t. how the observer consumes messages from the topics.

Again, great job!

kafka-observer/README.md Outdated Show resolved Hide resolved
kafka-observer/README.md Outdated Show resolved Hide resolved
kafka-observer/cmd/observer/main.go Show resolved Hide resolved
kafka-observer/cmd/consumer/main.go Outdated Show resolved Hide resolved
kafka-observer/cmd/consumer/main.go Show resolved Hide resolved
kafka-observer/run.sh Outdated Show resolved Hide resolved
kafka-observer/run.sh Outdated Show resolved Hide resolved
kafka-observer/run.sh Outdated Show resolved Hide resolved
kafka-observer/Dockerfile.observer Outdated Show resolved Hide resolved
kafka-observer/Dockerfile.consumer Outdated Show resolved Hide resolved
@qu1queee qu1queee force-pushed the qu1queee/kafka_observer branch 6 times, most recently from 46e04ef to c882b9c Compare April 12, 2024 09:31
@qu1queee qu1queee requested a review from reggeenr April 12, 2024 09:31
qu1queee and others added 11 commits April 13, 2024 10:41
Introduce kafkadata file
Add run.sh
Add related Dockerfile's
Improve README

Signed-off-by: encalada <[email protected]>
Signed-off-by: Karan Kumar <[email protected]>
Signed-off-by: encalada <[email protected]>
Signed-off-by: encalada <[email protected]>
Signed-off-by: encalada <[email protected]>
Enhance config fields

Signed-off-by: encalada <[email protected]>
Adopt the code to read from the embedded env Vars
into our Jobs. Adjust the run.sh script and the
README accordingly.

Signed-off-by: encalada <[email protected]>
- Implement the custom KafkaTopic interface. This
allow us to operate on Topics and Consumer Groups
offsets. It also uses goroutines and channels, to
determine when a change is done in terms of offsets
comparisons. We dont longer commit neither read
topic messages.
- Improve logging
- Add missing functions comments
- Fixes for job_invoker
- Improve handling on env vars for both consumers
and observers.
- Move images from golang to alpine
- Introduce IDLE_TIMEOUT env var

Signed-off-by: encalada <[email protected]>
Signed-off-by: Mahesh Kumawat <[email protected]>
Signed-off-by: encalada <[email protected]>
Co-authored-by: qu1queee <[email protected]>
Signed-off-by: encalada <[email protected]>
- when new topic and consumer group
- make ticker configurable

Co-authored-by: qu1queee <[email protected]>
Signed-off-by: encalada <[email protected]>
@qu1queee qu1queee force-pushed the qu1queee/kafka_observer branch from 645139c to 5fd21fe Compare April 13, 2024 08:41
reggeenr
reggeenr previously approved these changes Apr 16, 2024
Copy link
Collaborator

@reggeenr reggeenr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - Great job, team!

@reggeenr
Copy link
Collaborator

While testing, I realized that from time to time one can see large outliers. First I thought this is due to a timing issue in which the observer does not start a new instance because there seems to be still a jobrun around but then the jobrun stops right after and so misses new messages that arrive.

However, I can see that in the log below the observer did not noticed new messages sent to message topic-chain-2. I used the following configuration:

topic-chain-1:
  partitions: 1
  jobs:
    - name: consumer-chain-first
      consumer_group: chain-consumers
    - name: consumer-chain-second
      consumer_group: chain-consumers
    - name: consumer-chain-third
      consumer_group: chain-consumers
topic-chain-2:
  partitions: 1
  jobs:
    - name: consumer-chain-second
      consumer_group: chain-consumers
    - name: consumer-chain-third
      consumer_group: chain-consumers
topic-chain-3:
  partitions: 1
  jobs:
    - name: consumer-chain-third
      consumer_group: chain-consumers

Apr 16 23:30:34 Code Engine consumer-chain-second-jobrun-99hw5-0-0 2024/04/16 21:30:33 we are done, timeout expired
Apr 16 23:30:34 Code Engine consumer-chain-second-jobrun-99hw5-0-0 2024/04/16 21:30:33 Marking instance 'consumer-chain-second-jobrun-99hw5-0-0' as stopped ...
Apr 16 23:30:34 Code Engine consumer-chain-second-jobrun-99hw5-0-0 2024/04/16 21:30:33 completed
Apr 16 23:30:36 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:30:34 jobrun consumer-chain-first-jobrun-54nmg has been scheduled
Apr 16 23:30:36 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:30:35 consumed offset for topic: topic-chain-2, offset: map[0:17]
Apr 16 23:30:36 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:30:35 topicOffset: 0/17, consumerGroupOffset(chain-consumers): 0/9
Apr 16 23:30:36 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:30:35 topicOffset: 0/17, consumerGroupOffset(chain-consumers): 0/9
Apr 16 23:30:36 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:30:35 already created JobRuns instances are sufficient, nothing to do.
Apr 16 23:30:36 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:30:36 already created JobRuns instances are sufficient, nothing to do.
Apr 16 23:30:36 Code Engine kafka-producer-jobrun-4l76c-0-0 Sending 8 new messages to the topic 'topic-chain-1' ...
Apr 16 23:30:36 Code Engine kafka-producer-jobrun-4l76c-0-0 sent 8 message to topic 'topic-chain-1', new offset: 46, errorCode: '0, duration: 12ms'
Apr 16 23:30:36 Code Engine kafka-producer-jobrun-4l76c-0-0 stored 8 messages in DB, duration: 4ms'
Apr 16 23:30:36 Code Engine kafka-producer-jobrun-4l76c-0-0 Pausing for 71 seconds ...
Tuesday, Apr 16th 2024, 11:30pm
Apr 16 23:30:44 Code Engine consumer-chain-third-jobrun-6k6j8-0-0 2024/04/16 21:30:34 completed
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:34 Connecting to DB ...
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:34 Connection to DB established!
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:34 Marking instance 'consumer-chain-first-jobrun-54nmg-0-0' as started ...
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:34 Starting a new Sarama consumer
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:34 idle timeout of consumer set to 10 seconds
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:35 sarama consumer up and running!...
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:35 message claimed: key= key-f933b6a3-2b0f-44b8-9a48-75f2981b2a67-1713303031709, value = c412cdb3f5, timestamp = 2024-04-16 21:30:31.709 +0000 UTC, latency = 3.655761403s, topic = topic-chain-1, partition = 0, offset = 46
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:35 Sent message 'key-f933b6a3-2b0f-44b8-9a48-75f2981b2a67-1713303031709' to topic topic-chain-2
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:35 message claimed: key= key-dbc1aaa5-33f5-4e34-88b9-51ba2429a7e1-1713303031709, value = c412cdb3f5, timestamp = 2024-04-16 21:30:31.709 +0000 UTC, latency = 3.729621193s, topic = topic-chain-1, partition = 0, offset = 47
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:35 Sent message 'key-dbc1aaa5-33f5-4e34-88b9-51ba2429a7e1-1713303031709' to topic topic-chain-2
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:35 message claimed: key= key-f5891aa9-c3ee-49e6-84a5-a31b0c00700f-1713303031709, value = c412cdb3f5, timestamp = 2024-04-16 21:30:31.709 +0000 UTC, latency = 3.737840089s, topic = topic-chain-1, partition = 0, offset = 48
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:35 Sent message 'key-f5891aa9-c3ee-49e6-84a5-a31b0c00700f-1713303031709' to topic topic-chain-2
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:35 message claimed: key= key-0b959b66-8488-4d04-813c-3ffdbae97f00-1713303031709, value = c412cdb3f5, timestamp = 2024-04-16 21:30:31.709 +0000 UTC, latency = 3.745216279s, topic = topic-chain-1, partition = 0, offset = 49
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:35 Sent message 'key-0b959b66-8488-4d04-813c-3ffdbae97f00-1713303031709' to topic topic-chain-2
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:35 message claimed: key= key-5ff3e9e0-e2cb-4492-aa90-2044cbb6585d-1713303031709, value = c412cdb3f5, timestamp = 2024-04-16 21:30:31.709 +0000 UTC, latency = 3.753223384s, topic = topic-chain-1, partition = 0, offset = 50
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:35 Sent message 'key-5ff3e9e0-e2cb-4492-aa90-2044cbb6585d-1713303031709' to topic topic-chain-2
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:35 message claimed: key= key-ba300d01-b2ad-472f-aa59-b268e07a18bb-1713303031709, value = c412cdb3f5, timestamp = 2024-04-16 21:30:31.709 +0000 UTC, latency = 3.764374181s, topic = topic-chain-1, partition = 0, offset = 51
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:35 Sent message 'key-ba300d01-b2ad-472f-aa59-b268e07a18bb-1713303031709' to topic topic-chain-2
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:35 message claimed: key= key-baf3f57f-9295-44dd-8f15-4a0f7c66ebf7-1713303031709, value = c412cdb3f5, timestamp = 2024-04-16 21:30:31.709 +0000 UTC, latency = 3.774467963s, topic = topic-chain-1, partition = 0, offset = 52
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:35 Sent message 'key-baf3f57f-9295-44dd-8f15-4a0f7c66ebf7-1713303031709' to topic topic-chain-2
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:35 message claimed: key= key-f48189fc-3bb5-4f43-aadc-6000b44166ad-1713303031709, value = c412cdb3f5, timestamp = 2024-04-16 21:30:31.709 +0000 UTC, latency = 3.782070765s, topic = topic-chain-1, partition = 0, offset = 53
Apr 16 23:30:44 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:35 Sent message 'key-f48189fc-3bb5-4f43-aadc-6000b44166ad-1713303031709' to topic topic-chain-2
Apr 16 23:30:54 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:45 we are done, timeout expired
Apr 16 23:30:54 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:45 Marking instance 'consumer-chain-first-jobrun-54nmg-0-0' as stopped ...
Apr 16 23:30:54 Code Engine consumer-chain-first-jobrun-54nmg-0-0 2024/04/16 21:30:45 completed
Apr 16 23:30:54 Code Engine consumer-chain-third-jobrun-6k6j8-0-0 2024/04/16 21:30:47 completed
Apr 16 23:31:24 Code Engine consumer-chain-third-jobrun-6k6j8-0-0 2024/04/16 21:31:17 we are done, timeout expired
Apr 16 23:31:24 Code Engine consumer-chain-third-jobrun-6k6j8-0-0 2024/04/16 21:31:17 Marking instance 'consumer-chain-third-jobrun-6k6j8-0-0' as stopped ...
Apr 16 23:31:24 Code Engine consumer-chain-third-jobrun-6k6j8-0-0 2024/04/16 21:31:17 completed
Apr 16 23:31:46 Code Engine kafka-producer-jobrun-4l76c-0-0 Sending 8 new messages to the topic 'topic-chain-1' ...
Apr 16 23:31:46 Code Engine kafka-producer-jobrun-4l76c-0-0 sent 8 message to topic 'topic-chain-1', new offset: 54, errorCode: '0, duration: 18ms'
Apr 16 23:31:46 Code Engine kafka-producer-jobrun-4l76c-0-0 stored 8 messages in DB, duration: 6ms'
Apr 16 23:31:46 Code Engine kafka-producer-jobrun-4l76c-0-0 Pausing for 21 seconds ...
Apr 16 23:31:47 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:42 consumed offset for topic: topic-chain-1, offset: map[0:62]
Apr 16 23:31:47 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:42 topicOffset: 0/62, consumerGroupOffset(chain-consumers): 0/54
Apr 16 23:31:47 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:42 topicOffset: 0/62, consumerGroupOffset(chain-consumers): 0/54
Apr 16 23:31:47 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:42 topicOffset: 0/62, consumerGroupOffset(chain-consumers): 0/54
Apr 16 23:31:47 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:43 new desired array indices value: 1
Apr 16 23:31:47 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:43 creating jobrun for job consumer-chain-first with arrayspec 0-0
Apr 16 23:31:47 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:43 new desired array indices value: 1
Apr 16 23:31:47 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:43 creating jobrun for job consumer-chain-third with arrayspec 0-0
Apr 16 23:31:47 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:43 new desired array indices value: 1
Apr 16 23:31:47 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:43 creating jobrun for job consumer-chain-second with arrayspec 0-0
Apr 16 23:31:47 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:43 jobrun consumer-chain-first-jobrun-tw9xj created for job consumer-chain-first 
Apr 16 23:31:47 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:43 jobrun consumer-chain-third-jobrun-h9c9t created for job consumer-chain-third 
Apr 16 23:31:47 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:43 jobrun consumer-chain-second-jobrun-98vff created for job consumer-chain-second 
Apr 16 23:31:47 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:45 jobrun consumer-chain-third-jobrun-h9c9t has been scheduled
Apr 16 23:31:47 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:45 jobrun consumer-chain-first-jobrun-tw9xj has been scheduled
Apr 16 23:31:47 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:45 jobrun consumer-chain-second-jobrun-98vff has been scheduled
Apr 16 23:31:50 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:48 consumed offset for topic: topic-chain-2, offset: map[0:18]
Apr 16 23:31:50 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:48 topicOffset: 0/18, consumerGroupOffset(chain-consumers): 0/9
Apr 16 23:31:50 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:48 topicOffset: 0/18, consumerGroupOffset(chain-consumers): 0/9
Apr 16 23:31:50 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:48 consumed offset for topic: topic-chain-3, offset: map[0:11]
Apr 16 23:31:50 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:48 topicOffset: 0/11, consumerGroupOffset(chain-consumers): 0/9
Apr 16 23:31:50 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:49 already created JobRuns instances are sufficient, nothing to do.
Apr 16 23:31:50 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:49 already created JobRuns instances are sufficient, nothing to do.
Apr 16 23:31:50 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:49 already created JobRuns instances are sufficient, nothing to do.
Apr 16 23:31:50 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:49 consumed offset for topic: topic-chain-2, offset: map[0:25]
Apr 16 23:31:50 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:49 topicOffset: 0/25, consumerGroupOffset(chain-consumers): 0/9
Apr 16 23:31:50 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:49 topicOffset: 0/25, consumerGroupOffset(chain-consumers): 0/9
Apr 16 23:31:50 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:49 consumed offset for topic: topic-chain-3, offset: map[0:25]
Apr 16 23:31:50 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:49 topicOffset: 0/25, consumerGroupOffset(chain-consumers): 0/9
Apr 16 23:31:50 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:49 already created JobRuns instances are sufficient, nothing to do.
Apr 16 23:31:50 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:49 already created JobRuns instances are sufficient, nothing to do.
Apr 16 23:31:50 Code Engine ce-kafka-observer-jobrun-co9rz-0-0 2024/04/16 21:31:49 already created JobRuns instances are sufficient, nothing to do.
Apr 16 23:31:54 Code Engine consumer-chain-first-jobrun-tw9xj-0-0 2024/04/16 21:31:45 Connecting to DB ...
Apr 16 23:31:54 Code Engine consumer-chain-first-jobrun-tw9xj-0-0 2024/04/16 21:31:45 Connection to DB established!
Apr 16 23:31:54 Code Engine consumer-chain-first-jobrun-tw9xj-0-0 2024/04/16 21:31:45 Marking instance 'consumer-chain-first-jobrun-tw9xj-0-0' as started ...
Apr 16 23:31:54 Code Engine consumer-chain-first-jobrun-tw9xj-0-0 2024/04/16 21:31:45 Starting a new Sarama consumer
Apr 16 23:31:54 Code Engine consumer-chain-first-jobrun-tw9xj-0-0 2024/04/16 21:31:45 idle timeout of consumer set to 10 seconds
Apr 16 23:31:54 Code Engine consumer-chain-second-jobrun-98vff-0-0 2024/04/16 21:31:46 Connecting to DB ...
Apr 16 23:31:54 Code Engine consumer-chain-second-jobrun-98vff-0-0 2024/04/16 21:31:46 Connection to DB established!
Apr 16 23:31:54 Code Engine consumer-chain-second-jobrun-98vff-0-0 2024/04/16 21:31:46 Marking instance 'consumer-chain-second-jobrun-98vff-0-0' as started ...
Apr 16 23:31:54 Code Engine consumer-chain-second-jobrun-98vff-0-0 2024/04/16 21:31:46 Starting a new Sarama consumer
Apr 16 23:31:54 Code Engine consumer-chain-second-jobrun-98vff-0-0 2024/04/16 21:31:46 idle timeout of consumer set to 20 seconds
Apr 16 23:31:54 Code Engine consumer-chain-third-jobrun-h9c9t-0-0 2024/04/16 21:31:46 Connecting to DB ...
Apr 16 23:31:54 Code Engine consumer-chain-third-jobrun-h9c9t-0-0 2024/04/16 21:31:46 Connection to DB established!
Apr 16 23:31:54 Code Engine consumer-chain-third-jobrun-h9c9t-0-0 2024/04/16 21:31:46 Marking instance 'consumer-chain-third-jobrun-h9c9t-0-0' as started ...
Apr 16 23:31:54 Code Engine consumer-chain-third-jobrun-h9c9t-0-0 2024/04/16 21:31:46 Starting a new Sarama consumer
Apr 16 23:31:54 Code Engine consumer-chain-third-jobrun-h9c9t-0-0 2024/04/16 21:31:46 idle timeout of consumer set to 30 seconds
Apr 16 23:31:54 Code Engine consumer-chain-third-jobrun-h9c9t-0-0 2024/04/16 21:31:48 sarama consumer up and running!...
Apr 16 23:31:54 Code Engine consumer-chain-second-jobrun-98vff-0-0 2024/04/16 21:31:48 sarama consumer up and running!...
Apr 16 23:31:54 Code Engine consumer-chain-first-jobrun-tw9xj-0-0 2024/04/16 21:31:48 sarama consumer up and running!...
Apr 16 23:31:54 Code Engine consumer-chain-second-jobrun-98vff-0-0 2024/04/16 21:31:48 message claimed: key= key-f933b6a3-2b0f-44b8-9a48-75f2981b2a67-1713303031709, value = c412cdb3f5, timestamp = 2024-04-16 21:30:35.365 +0000 UTC, latency = 1m13.505169098s, topic = topic-chain-2, partition = 0, offset = 9
Apr 16 23:31:54 Code Engine consumer-chain-first-jobrun-tw9xj-0-0 2024/04/16 21:31:48 message claimed: key= key-9ba52924-89cb-4349-adf0-e727a56a883c-1713303102726, value = c412cdb3f5, timestamp = 2024-04-16 21:31:42.726 +0000 UTC, latency = 6.154893513s, topic = topic-chain-1, partition = 0, offset = 54
Apr 16 23:31:54 Code Engine consumer-chain-second-jobrun-98vff-0-0 2024/04/16 21:31:48 Sent message 'key-f933b6a3-2b0f-44b8-9a48-75f2981b2a67-1713303031709' to topic topic-chain-3
Apr 16 23:31:54 Code Engine consumer-chain-second-jobrun-98vff-0-0 2024/04/16 21:31:48 message claimed: key= key-dbc1aaa5-33f5-4e34-88b9-51ba2429a7e1-1713303031709, value = c412cdb3f5, timestamp = 2024-04-16 21:30:35.438 +0000 UTC, latency = 1m13.471634345s, topic = topic-chain-2, partition = 0, offset = 10
Apr 16 23:31:54 Code Engine consumer-chain-third-jobrun-h9c9t-0-0 2024/04/16 21:31:48 message claimed: key= key-f933b6a3-2b0f-44b8-9a48-75f2981b2a67-1713303031709, value = c412cdb3f5, timestamp = 2024-04-16 21:31:48.87 +0000 UTC, latency = 39.846783ms, topic = topic-chain-3, partition = 0, offset = 9

@reggeenr reggeenr dismissed their stale review April 16, 2024 21:54

I want to better understand the ticker logic, as I have a concern that there is a bug

@qu1queee
Copy link
Member Author

qu1queee commented Apr 16, 2024

@reggeenr

I want to better understand the ticker logic, as I have a concern that there is a bug

You would not always see in the logs the new messages(offset change), due to the ticker. But the latest offset change being logged should reflect the change from all previous arrived messages. This is ok, as long as a Jobrun was already running.

@reggeenr
Copy link
Collaborator

Sorry that it took so long. I finally was able to recreate a test setup. Long story short, I wasn't able to reproduce the issue that I observed once in a long running test scenario.

This was my test setup

sample-1:
  partitions: 1
  jobs:
    - name: consumer-1
      consumer_group: payments-consumer-group
    - name: consumer-2
      consumer_group: foobar-consumer-group
sample-2:
  partitions: 1
  jobs:
    - name: consumer-3
      consumer_group: shipping-consumer-group

By sending messages to both topics, I was able to observe that the corresponding consumer were woken up by the observer.
image

Hence. I am approving :)

Sorry for the delay. Great job team

Copy link
Collaborator

@reggeenr reggeenr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@reggeenr reggeenr merged commit 2cb280a into IBM:main Apr 22, 2024
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants