Skip to content

Commit

Permalink
Merge pull request mediacloud#266 from philbudne/qfetch-later
Browse files Browse the repository at this point in the history
Queue fetcher updates
  • Loading branch information
philbudne authored Mar 20, 2024
2 parents b461e30 + fa55202 commit a7a8a8e
Show file tree
Hide file tree
Showing 9 changed files with 598 additions and 382 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ repos:
- id: black
language_version: python3.10
- repo: http://github.com/pre-commit/mirrors-mypy
rev: v1.8.0
rev: v1.9.0
hooks:
- id: mypy
entry: bin/pre-commit-wrapper.py mypy
Expand Down
26 changes: 16 additions & 10 deletions indexer/storyapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import argparse
import logging
import multiprocessing
import os
import queue
import sys
Expand Down Expand Up @@ -206,15 +207,22 @@ def __init__(self, process_name: str, descr: str):
# and avoid possible (if unlikely) surprise later.
self.senders: Dict[BlockingChannel, StorySender] = {}

def process_message(self, im: InputMessage) -> None:
chan = im.channel
if chan in self.senders:
sender = self.senders[chan]
else:
def decode_story(self, im: InputMessage) -> BaseStory:
story = BaseStory.load(im.body)
assert isinstance(story, BaseStory)
return story

def _story_sender(self, chan: BlockingChannel) -> StorySender:
sender = self.senders.get(chan)
if not sender:
sender = self.senders[chan] = StorySender(self, chan)
return sender

def process_message(self, im: InputMessage) -> None:
sender = self._story_sender(im.channel)

# raised exceptions will cause retry; quarantine immediately?
story = BaseStory.load(im.body)
story = self.decode_story(im)

self.process_story(sender, story)

Expand Down Expand Up @@ -369,10 +377,8 @@ def end_of_batch(self) -> None:
class MultiThreadStoryWorker(IntervalMixin, StoryWorker):
# include thread name in log message format
LOG_FORMAT = "thread"

# subclass must set value!
# (else will see AttributeError)
WORKER_THREADS_DEFAULT: int
CPU_COUNT = multiprocessing.cpu_count()
WORKER_THREADS_DEFAULT = CPU_COUNT

def __init__(self, process_name: str, descr: str):
super().__init__(process_name, descr)
Expand Down
54 changes: 39 additions & 15 deletions indexer/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def start_pika_thread(self) -> None:
return

self._pika_thread = threading.Thread(
target=self._pika_thread_body, name="Pika-thread", daemon=True
target=self._pika_thread_body, name="Pika", daemon=True
)
self._pika_thread.start()

Expand Down Expand Up @@ -410,11 +410,8 @@ def _pika_thread_body(self) -> None:
logger.info("Pika thread exiting")

def _call_in_pika_thread(self, cb: Callable[[], None]) -> None:
assert self.connection

# XXX this will need a lock if app runs in multiple threads
if self._pika_thread is None:
# here from a QApp
# here from a QApp in Main thread
# transactions will NOT be enabled
# (unless _subscribe is overridden)
self.start_pika_thread()
Expand All @@ -428,6 +425,7 @@ def _call_in_pika_thread(self, cb: Callable[[], None]) -> None:

# NOTE! add_callback_threadsafe is documented (in the Pika
# 1.3.2 comments) as the ONLY thread-safe connection method!!!
assert self.connection # checked above
self.connection.add_callback_threadsafe(cb)

def _stop_pika_thread(self) -> None:
Expand Down Expand Up @@ -565,8 +563,17 @@ def _on_message(
"""
im = InputMessage(chan, method, properties, body, time.monotonic())
msglogger.debug("on_message tag #%s", method.delivery_tag)
self._on_input_message(im)

def _on_input_message(self, im: InputMessage) -> None:
"""
called in Pika thread.
override to interrupt direct delivery of messages to _message_queue
"""
self._message_queue.put(im)

_put_message_queue = _on_input_message

def _subscribe(self) -> None:
"""
Called from Pika thread with newly opened connection.
Expand Down Expand Up @@ -623,9 +630,9 @@ def _process_one_message(self, im: InputMessage) -> bool:
except QuarantineException as e:
status = "error"
self._quarantine(im, e)
except RequeueException as e:
except RequeueException:
status = "requeue"
self._requeue(im, e)
self._requeue(im)
except Exception as e:
if self._crash_on_exception:
raise # for debug
Expand Down Expand Up @@ -655,18 +662,25 @@ def _ack_and_commit(self, im: InputMessage, multiple: bool = False) -> None:
This avoids using functools.partial, which I find less
illustrative of a function call with captured values. -phil
"""
tag = im.method.delivery_tag # tag from last message
assert tag is not None

def acker() -> None:
msglogger.debug("ack and commit #%s", tag)
self._pika_ack_and_commit(im, multiple)

im.channel.basic_ack(delivery_tag=tag, multiple=multiple)
self._call_in_pika_thread(acker)

# AFTER basic_ack!
im.channel.tx_commit() # commit sent messages and ack atomically!
def _pika_ack_and_commit(self, im: InputMessage, multiple: bool = False) -> None:
"""
call ONLY from pika thread!!
"""
tag = im.method.delivery_tag # tag from last message
assert tag is not None

self._call_in_pika_thread(acker)
chan = im.channel

msglogger.debug("ack and commit #%s", tag)
chan.basic_ack(delivery_tag=tag, multiple=multiple)
# AFTER basic_ack!
chan.tx_commit() # commit sent messages and ack atomically!

def _exc_headers(self, e: Exception) -> Dict:
"""
Expand Down Expand Up @@ -774,7 +788,7 @@ def _retry(self, im: InputMessage, e: Exception) -> bool:
def set_requeue_delay_ms(self, ms: int) -> None:
self.requeue_delay_str = str(int(ms))

def _requeue(self, im: InputMessage, e: Exception) -> bool:
def _requeue(self, im: InputMessage) -> bool:
"""
Requeue message to -fast queue, which has no consumers, with
an expiration/TTL; when messages expire, they are routed
Expand All @@ -797,6 +811,16 @@ def _requeue(self, im: InputMessage, e: Exception) -> bool:
)
return True # requeued

def message_queue_len(self) -> int:
"""
NOTE! the underlying qsize method is described as "unreliable"
USE ONLY FOR LOGGING/STATS!!
"""
if self._message_queue:
return self._message_queue.qsize()
else:
return 0

def process_message(self, im: InputMessage) -> None:
raise NotImplementedError("Worker.process_message not overridden")

Expand Down
Loading

0 comments on commit a7a8a8e

Please sign in to comment.