Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make long polling interval a command line parameter option #61

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions pyqs/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ def main():
action="store",
)

parser.add_argument(
"--long-polling-interval",
dest="long_polling_interval",
type=int,
default=10,
help='How long to poll SQS for a new message.',
action="store",
)

args = parser.parse_args()

_main(
Expand All @@ -118,7 +127,8 @@ def main():
secret_access_key=args.secret_access_key,
interval=args.interval,
batchsize=args.batchsize,
prefetch_multiplier=args.prefetch_multiplier
prefetch_multiplier=args.prefetch_multiplier,
long_polling_interval=args.long_polling_interval
)


Expand All @@ -130,16 +140,16 @@ def _add_cwd_to_path():

def _main(queue_prefixes, concurrency=5, logging_level="WARN",
region=None, access_key_id=None, secret_access_key=None,
interval=1, batchsize=10, prefetch_multiplier=2):
interval=1, batchsize=10, prefetch_multiplier=2, long_polling_interval=10):
logging.basicConfig(
format="[%(levelname)s]: %(message)s",
level=getattr(logging, logging_level),
)
logger.info("Starting PyQS version {}".format(__version__))
manager = ManagerWorker(
queue_prefixes, concurrency, interval, batchsize,
prefetch_multiplier=prefetch_multiplier, region=region,
access_key_id=access_key_id, secret_access_key=secret_access_key,
prefetch_multiplier=prefetch_multiplier, long_polling_interval=long_polling_interval,
region=region, access_key_id=access_key_id, secret_access_key=secret_access_key,
)
_add_cwd_to_path()
manager.start()
Expand Down
12 changes: 8 additions & 4 deletions pyqs/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from pyqs.utils import get_aws_region_name, decode_message

MESSAGE_DOWNLOAD_BATCH_SIZE = 10
LONG_POLLING_INTERVAL = 20
DEFAULT_LONG_POLLING_INTERVAL = 10
logger = logging.getLogger("pyqs")


Expand Down Expand Up @@ -59,7 +59,7 @@ def parent_is_alive(self):

class ReadWorker(BaseWorker):

def __init__(self, queue_url, internal_queue, batchsize,
def __init__(self, queue_url, internal_queue, batchsize, long_polling_interval=DEFAULT_LONG_POLLING_INTERVAL,
connection_args=None, *args, **kwargs):
super(ReadWorker, self).__init__(*args, **kwargs)
if connection_args is None:
Expand All @@ -74,6 +74,7 @@ def __init__(self, queue_url, internal_queue, batchsize,

self.internal_queue = internal_queue
self.batchsize = batchsize
self.long_polling_interval = long_polling_interval

def run(self):
# Set the child process to not receive any keyboard interrupts
Expand All @@ -91,7 +92,7 @@ def read_message(self):
messages = self.conn.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=self.batchsize,
WaitTimeSeconds=LONG_POLLING_INTERVAL,
WaitTimeSeconds=self.long_polling_interval,
).get('Messages', [])

logger.debug(
Expand Down Expand Up @@ -236,7 +237,7 @@ def process_message(self):
class ManagerWorker(object):

def __init__(self, queue_prefixes, worker_concurrency, interval, batchsize,
prefetch_multiplier=2, region=None, access_key_id=None,
prefetch_multiplier=2, long_polling_interval=DEFAULT_LONG_POLLING_INTERVAL, region=None, access_key_id=None,
secret_access_key=None):
self.connection_args = {
"region": region,
Expand All @@ -250,6 +251,7 @@ def __init__(self, queue_prefixes, worker_concurrency, interval, batchsize,
self.batchsize = 1
self.interval = interval
self.prefetch_multiplier = prefetch_multiplier
self.long_polling_interval = long_polling_interval
self.load_queue_prefixes(queue_prefixes)
self.queue_urls = self.get_queue_urls_from_queue_prefixes(
self.queue_prefixes)
Expand All @@ -272,6 +274,7 @@ def _initialize_reader_children(self):
self.reader_children.append(
ReadWorker(
queue_url, self.internal_queue, self.batchsize,
long_polling_interval=self.long_polling_interval,
connection_args=self.connection_args,
parent_id=self._pid,
)
Expand Down Expand Up @@ -370,6 +373,7 @@ def _replace_reader_children(self):
self.reader_children.pop(index)
worker = ReadWorker(
queue_url, self.internal_queue, self.batchsize,
long_polling_interval=self.long_polling_interval,
connection_args=self.connection_args,
parent_id=self._pid,
)
Expand Down
9 changes: 5 additions & 4 deletions tests/test_manager_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def test_main_method(ManagerWorker):

ManagerWorker.assert_called_once_with(
['email1', 'email2'], 2, 1, 10, prefetch_multiplier=2,
long_polling_interval=10,
region=None, secret_access_key=None, access_key_id=None,
)
ManagerWorker.return_value.start.assert_called_once_with()
Expand All @@ -116,14 +117,15 @@ def test_real_main_method(ArgumentParser, _main):
ArgumentParser.return_value.parse_args.return_value = Mock(
concurrency=3, queues=["email1"], interval=1, batchsize=10,
logging_level="WARN", region='us-east-1', prefetch_multiplier=2,
long_polling_interval=3,
access_key_id=None, secret_access_key=None,
)
main()

_main.assert_called_once_with(
queue_prefixes=['email1'], concurrency=3, interval=1, batchsize=10,
logging_level="WARN", region='us-east-1', prefetch_multiplier=2,
access_key_id=None, secret_access_key=None,
long_polling_interval=3, logging_level="WARN", region='us-east-1',
prefetch_multiplier=2, access_key_id=None, secret_access_key=None,
)


Expand Down Expand Up @@ -288,7 +290,6 @@ def process_counts():
sys.exit.assert_called_once_with(0)


@patch("pyqs.worker.LONG_POLLING_INTERVAL", 3)
@mock_sqs
@mock_sqs_deprecated
def test_master_shuts_down_busy_read_workers():
Expand Down Expand Up @@ -338,7 +339,7 @@ def sleep_and_kill(pid):
# Setup Manager
manager = ManagerWorker(
queue_prefixes=["tester"], worker_concurrency=0, interval=0.0,
batchsize=1,
batchsize=1, long_polling_interval=3
)
manager.start()

Expand Down