-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clean up #3693
Clean up #3693
Conversation
📝 WalkthroughWalkthroughThe pull request introduces significant modifications to the Changes
Possibly related PRs
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (3)
src/workflows/airqo_etl_utils/message_broker_utils.py (3)
Line range hint
360-426
: Logical error in loop condition withinconsume_from_topic
The loop condition in the
consume_from_topic
method may not behave as expected:while streaming or (message_count < max_messages if max_messages else True):When
max_messages
isNone
andstreaming
isFalse
, the condition simplifies to:while False or True # which is TrueThis causes the loop to run indefinitely, even when
streaming
isFalse
andmax_messages
is not set. This might not be the intended behavior.Consider revising the loop condition to properly handle scenarios where
max_messages
isNone
:- while streaming or (message_count < max_messages if max_messages else True): + while streaming or (max_messages is not None and message_count < max_messages):This adjustment ensures that:
- If
streaming
isTrue
, the loop runs indefinitely.- If
max_messages
is specified, the loop runs untilmessage_count
reachesmax_messages
.- If neither
streaming
isTrue
normax_messages
is specified, the loop exits appropriately.
Line range hint
399-407
: Incorrect offset assignment when seeking to a specific offsetIn the
consume_from_topic
method, when anoffset
is provided, the code attempts to assign partitions with the current offsets instead of the specifiedoffset
:partitions = [ TopicPartition(topic, p.partition, p.offset) for p in consumer.assignment() ]Using
p.offset
does not set the partitions to the desiredoffset
. This might prevent the consumer from starting at the intended position.To resolve this, set the
offset
parameter to the providedoffset
value:if offset is not None: logger.info(f"Seeking to offset {offset} for all partitions...") partitions = [ - TopicPartition(topic, p.partition, p.offset) + TopicPartition(topic, p.partition, offset) for p in consumer.assignment() ] consumer.assign(partitions)This change ensures that the consumer begins consuming from the specified
offset
for all partitions.
Line range hint
388-395
: Redundant calls toconsumer.seek()
after assignmentWhen
from_beginning
isTrue
, the code assigns partitions withoffset=0
and then callsconsumer.seek(partition)
for each partition:consumer.assign(partitions) for partition in partitions: consumer.seek(partition)However, since the partitions have already been assigned with
offset=0
, callingconsumer.seek(partition)
without specifying an offset may overwrite the intended offset or have no effect, depending on the consumer configuration.It's more reliable to set the desired offset during assignment and omit the
seek
calls:if from_beginning: logger.info("Seeking to the beginning of all partitions...") partitions = [ TopicPartition(topic, p.partition, offset=0) for p in consumer.assignment() ] - consumer.assign(partitions) - for partition in partitions: - consumer.seek(partition) + consumer.assign(partitions)This ensures that the consumer starts from the beginning of each partition without unnecessary redundancy.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
- src/workflows/airqo_etl_utils/message_broker_utils.py (1 hunks)
🧰 Additional context used
📓 Learnings (1)
src/workflows/airqo_etl_utils/message_broker_utils.py (1)
Learnt from: NicholasTurner23 PR: airqo-platform/AirQo-api#3683 File: src/workflows/airqo_etl_utils/message_broker_utils.py:271-302 Timestamp: 2024-10-17T21:49:16.098Z Learning: In the `MessageBrokerUtils` class, messages are flushed in the caller method after invoking `_send_message`, so it's not necessary to call `producer.flush()` within the `_send_message` method.
🔇 Additional comments (1)
src/workflows/airqo_etl_utils/message_broker_utils.py (1)
360-360
: Good practice: Flushing the producer to ensure all messages are sentThe addition of
producer.flush()
at the end of thepublish_to_topic
method ensures that all buffered messages are sent before the producer is closed. This guarantees that no messages are lost.
Description
Just some clean up.
Summary by CodeRabbit
New Features
Improvements