Skip to content

Commit

Permalink
Update version
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-piles committed Dec 10, 2024
1 parent 8064305 commit cc30c4b
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "queue-processor"
version = "2024.12.2.1"
version = "2024.12.10.1"
description = "Manage queues on Uwazi services"
license = { file = "LICENSE" }
authors = [{ name = "HURIDOCS" }]
Expand Down
30 changes: 11 additions & 19 deletions src/queue_processor/QueueProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,32 +46,28 @@ def create_queues(self):
self.queue_processor_logger.info(f"Creating queue {queue_name}")
self.get_queue(queue_name).createQueue().vt(120).exceptions(False).execute()

def start(self, process: callable, restart_condition: Callable = None):
def start(self, process: callable, hide_message_seconds: int = 0):
self.queue_processor_logger.info("QueueProcessor running")
while True:
restart = False
for task_queue_name, results_queue_name in zip(self.task_queues_names, self.results_queues_names):
try:
self.create_queues()
task_queue = self.get_queue(task_queue_name)
message = task_queue.receiveMessage().execute()
task_queue.deleteMessage(qname=task_queue_name, id=message["id"]).execute()
message = utils.decode_message(message["message"])
results = process(message)
message = task_queue.receiveMessage(vt=hide_message_seconds if hide_message_seconds else None).execute()

if hide_message_seconds == 0:
task_queue.deleteMessage(qname=task_queue_name, id=message["id"]).execute()

results = process(utils.decode_message(message["message"]))

if not results:
continue

self.get_queue(results_queue_name).sendMessage(delay=self.delay_time_for_results).message(
results
).execute()
if hide_message_seconds:
task_queue.deleteMessage(qname=task_queue_name, id=message["id"]).execute()

try:
restart = restart_condition(message)
except:
pass

break
results_queue = self.get_queue(results_queue_name)
results_queue.sendMessage(delay=self.delay_time_for_results).message(results).execute()

except NoMessageInQueue:
sleep(2)
Expand All @@ -83,7 +79,3 @@ def start(self, process: callable, restart_condition: Callable = None):
self.exists_queues = False
self.queue_processor_logger.error(f"Error: {e}", exc_info=True)
sleep(60)

if restart:
sleep(self.delay_time_for_results + 5)
break

0 comments on commit cc30c4b

Please sign in to comment.