Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stop consume functionality #193

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

Adarsh-jaiss
Copy link
Contributor

I have created a stopConsume method with self parameter, refefered in the issue #139

def stopConsume(self):
    if self.t_consume is not None:
        self.t_consume.cancel()
        self.t_consume = None

In this code :

  • The stopConsume method is added to the Consumer class. This method will cancel the task responsible for consuming messages, effectively stopping the automatic message consumption.

  • In the __consume method, we added a condition to check whether the consumer task (t_consume) is still running. If not, it will break out of the loop, ensuring that the consumption stops when stopConsume is called.

memphis/consumer.py Outdated Show resolved Hide resolved
@idanasulin2706
Copy link
Contributor

There are also a few lint issues

Copy link
Contributor

@rnowling-memphis rnowling-memphis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Adarsh-jaiss How did you test this? Can you please describe your test plan?

memphis/consumer.py Outdated Show resolved Hide resolved
@Adarsh-jaiss
Copy link
Contributor Author

@Adarsh-jaiss How did you test this? Can you please describe your test plan?

As for the test plan of this stop_consume method, we can follow this approach:

  1. We can create an Instance of consumer class and define a callback function that will be used in the consume method.
  2. Now we can call the consume method with the callback function to start consuming events.
  3. Now call the stop_consume method to stop the consumption of events and verify that the consumption has stopped by checking if the callback function is no longer being invoked.

Here's the unittest file:

import unittest
from memphis.consumer import Consumer

class ConsumerTestCase(unittest.TestCase):
    def test_stopConsume(self):
        consumer = Consumer(connection=None, station_name='my_station', consumer_name='my_consumer', consumer_group='my_group', pull_interval_ms=1000, batch_size=100, batch_max_time_to_wait_ms=5000, max_ack_time_ms=1000)
        
        # Define a callback function
        def callback(messages, error, context):
            pass
        
        # Start consuming events
        consumer.consume(callback)
        
        # Stop consuming events
        consumer.stopConsume()
        
        # Verify that the consumption has stopped
        # You can add your own assertions here
        
        # Optionally, verify that the t_consume task is None
        self.assertIsNone(consumer.t_consume)

if __name__ == '__main__':
    unittest.main()

Please do correct me, and tell me the suggested changes since i am also a beginner and trying to learning new stuff by doing :)

@rnowling-memphis
Copy link
Contributor

@Adarsh-jaiss Did you run the unit test you wrote? Did it run successfully? The best way to learn is to try things -- you won't break anything. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants