diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 05668f8f..dc566769 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,7 +16,7 @@ repos: - id: black language_version: python3.10 - repo: http://github.com/pre-commit/mirrors-mypy - rev: v1.8.0 + rev: v1.9.0 hooks: - id: mypy entry: bin/pre-commit-wrapper.py mypy diff --git a/indexer/storyapp.py b/indexer/storyapp.py index d870b4f3..324336f3 100644 --- a/indexer/storyapp.py +++ b/indexer/storyapp.py @@ -10,6 +10,7 @@ import argparse import logging +import multiprocessing import os import queue import sys @@ -206,15 +207,22 @@ def __init__(self, process_name: str, descr: str): # and avoid possible (if unlikely) surprise later. self.senders: Dict[BlockingChannel, StorySender] = {} - def process_message(self, im: InputMessage) -> None: - chan = im.channel - if chan in self.senders: - sender = self.senders[chan] - else: + def decode_story(self, im: InputMessage) -> BaseStory: + story = BaseStory.load(im.body) + assert isinstance(story, BaseStory) + return story + + def _story_sender(self, chan: BlockingChannel) -> StorySender: + sender = self.senders.get(chan) + if not sender: sender = self.senders[chan] = StorySender(self, chan) + return sender + + def process_message(self, im: InputMessage) -> None: + sender = self._story_sender(im.channel) # raised exceptions will cause retry; quarantine immediately? - story = BaseStory.load(im.body) + story = self.decode_story(im) self.process_story(sender, story) @@ -369,10 +377,8 @@ def end_of_batch(self) -> None: class MultiThreadStoryWorker(IntervalMixin, StoryWorker): # include thread name in log message format LOG_FORMAT = "thread" - - # subclass must set value! - # (else will see AttributeError) - WORKER_THREADS_DEFAULT: int + CPU_COUNT = multiprocessing.cpu_count() + WORKER_THREADS_DEFAULT = CPU_COUNT def __init__(self, process_name: str, descr: str): super().__init__(process_name, descr) diff --git a/indexer/worker.py b/indexer/worker.py index 22fc2134..b49afa11 100644 --- a/indexer/worker.py +++ b/indexer/worker.py @@ -349,7 +349,7 @@ def start_pika_thread(self) -> None: return self._pika_thread = threading.Thread( - target=self._pika_thread_body, name="Pika-thread", daemon=True + target=self._pika_thread_body, name="Pika", daemon=True ) self._pika_thread.start() @@ -410,11 +410,8 @@ def _pika_thread_body(self) -> None: logger.info("Pika thread exiting") def _call_in_pika_thread(self, cb: Callable[[], None]) -> None: - assert self.connection - - # XXX this will need a lock if app runs in multiple threads if self._pika_thread is None: - # here from a QApp + # here from a QApp in Main thread # transactions will NOT be enabled # (unless _subscribe is overridden) self.start_pika_thread() @@ -428,6 +425,7 @@ def _call_in_pika_thread(self, cb: Callable[[], None]) -> None: # NOTE! add_callback_threadsafe is documented (in the Pika # 1.3.2 comments) as the ONLY thread-safe connection method!!! + assert self.connection # checked above self.connection.add_callback_threadsafe(cb) def _stop_pika_thread(self) -> None: @@ -565,8 +563,17 @@ def _on_message( """ im = InputMessage(chan, method, properties, body, time.monotonic()) msglogger.debug("on_message tag #%s", method.delivery_tag) + self._on_input_message(im) + + def _on_input_message(self, im: InputMessage) -> None: + """ + called in Pika thread. + override to interrupt direct delivery of messages to _message_queue + """ self._message_queue.put(im) + _put_message_queue = _on_input_message + def _subscribe(self) -> None: """ Called from Pika thread with newly opened connection. @@ -623,9 +630,9 @@ def _process_one_message(self, im: InputMessage) -> bool: except QuarantineException as e: status = "error" self._quarantine(im, e) - except RequeueException as e: + except RequeueException: status = "requeue" - self._requeue(im, e) + self._requeue(im) except Exception as e: if self._crash_on_exception: raise # for debug @@ -655,18 +662,25 @@ def _ack_and_commit(self, im: InputMessage, multiple: bool = False) -> None: This avoids using functools.partial, which I find less illustrative of a function call with captured values. -phil """ - tag = im.method.delivery_tag # tag from last message - assert tag is not None def acker() -> None: - msglogger.debug("ack and commit #%s", tag) + self._pika_ack_and_commit(im, multiple) - im.channel.basic_ack(delivery_tag=tag, multiple=multiple) + self._call_in_pika_thread(acker) - # AFTER basic_ack! - im.channel.tx_commit() # commit sent messages and ack atomically! + def _pika_ack_and_commit(self, im: InputMessage, multiple: bool = False) -> None: + """ + call ONLY from pika thread!! + """ + tag = im.method.delivery_tag # tag from last message + assert tag is not None - self._call_in_pika_thread(acker) + chan = im.channel + + msglogger.debug("ack and commit #%s", tag) + chan.basic_ack(delivery_tag=tag, multiple=multiple) + # AFTER basic_ack! + chan.tx_commit() # commit sent messages and ack atomically! def _exc_headers(self, e: Exception) -> Dict: """ @@ -774,7 +788,7 @@ def _retry(self, im: InputMessage, e: Exception) -> bool: def set_requeue_delay_ms(self, ms: int) -> None: self.requeue_delay_str = str(int(ms)) - def _requeue(self, im: InputMessage, e: Exception) -> bool: + def _requeue(self, im: InputMessage) -> bool: """ Requeue message to -fast queue, which has no consumers, with an expiration/TTL; when messages expire, they are routed @@ -797,6 +811,16 @@ def _requeue(self, im: InputMessage, e: Exception) -> bool: ) return True # requeued + def message_queue_len(self) -> int: + """ + NOTE! the underlying qsize method is described as "unreliable" + USE ONLY FOR LOGGING/STATS!! + """ + if self._message_queue: + return self._message_queue.qsize() + else: + return 0 + def process_message(self, im: InputMessage) -> None: raise NotImplementedError("Worker.process_message not overridden") diff --git a/indexer/workers/fetcher/rss-queuer.py b/indexer/workers/fetcher/rss-queuer.py index 387a050b..121335c5 100644 --- a/indexer/workers/fetcher/rss-queuer.py +++ b/indexer/workers/fetcher/rss-queuer.py @@ -8,7 +8,8 @@ Queuer framework handles local files, http/https/s3 URLs, transparently un-gzips. -On-the-fly XML parsing using "SAX" parser. +On-the-fly XML parsing using iterparse +(w/o reading whole file into memory) NOTE! --yesterday only valid after 01:00 GMT (before that, gets the day before) @@ -18,8 +19,11 @@ import html import logging import time -import xml.sax -from typing import BinaryIO, List, Optional +from typing import BinaryIO + +# xml.etree.ElementTree.iterparse doesn't have recover argument +# (fatal error on control characters in element text), so using lxml +from lxml.etree import iterparse from indexer.app import run from indexer.queuer import Queuer @@ -27,120 +31,34 @@ S3_URL_BASE = "https://mediacloud-public.s3.amazonaws.com/backup-daily-rss" -Attrs = xml.sax.xmlreader.AttributesImpl - Story = StoryFactory() logger = logging.getLogger("queue-rss") -def optional_int(input: Optional[str]) -> Optional[int]: +def optional_int(input: str | None) -> int | None: if not input or not input.isdigit(): return None return int(input) -class RSSHandler(xml.sax.ContentHandler): - link: str # required! - domain: Optional[str] - pub_date: Optional[str] - title: Optional[str] - source_url: Optional[str] - source_feed_id: Optional[int] - source_source_id: Optional[int] - content: List[str] - - def __init__(self, app: "RSSQueuer", fname: str): - self.app = app - self.parsed = self.bad = 0 - self.in_item = False - self.file_name = fname - self.reset_item() - self.content = [] - - def reset_item(self) -> None: - self.link = "" - self.domain = "" - self.pub_date = "" - self.title = "" - self.source_url = None - self.source_feed_id = None - self.source_source_id = None - - def startElement(self, name: str, attrs: Attrs) -> None: - if name == "item": - # error if in_item is True (missing end element?) - self.in_item = True - elif self.in_item: - if name == "source": - self.source_url = attrs.get("url") - self.source_feed_id = optional_int(attrs.get("mcFeedId")) - self.source_source_id = optional_int(attrs.get("mcSourceId")) - self.content = [] # DOES NOT WORK FOR NESTED TAGS! - - def characters(self, content: str) -> None: - """ - handle text content inside current tag; - may come in multiple calls!! - DOES NOT WORK FOR NESTED TAGS!! - (would need a stack pushed by startElement, popped by endElement) - """ - self.content.append(content) - - def endElement(self, name: str): # type: ignore[no-untyped-def] - if not self.in_item: - return - - # here at end of a tag inside an - - # join bits of tag content together - # DOES NOT WORK FOR NESTED TAGS!! - content = "".join(self.content) - - if name == "link": - # undo HTML entity escapes: - self.link = html.unescape(content).strip() - elif name == "domain": - self.domain = content.strip() or None - elif name == "pubDate": - self.pub_date = content.strip() or None - elif name == "title": - self.title = content.strip() or None - elif name == "item": - # domain not required by queue-based fetcher - if self.link: - s = Story() - # mypy reval_type(rss) in "with s.rss_entry() as rss" gives Any!! - rss = s.rss_entry() - with rss: - rss.link = self.link - rss.domain = self.domain - rss.pub_date = self.pub_date - rss.title = self.title - rss.source_url = self.source_url - rss.source_feed_id = self.source_feed_id - rss.source_source_id = self.source_source_id - rss.via = self.file_name # instead of fetch_date - self.app.send_story(s) - self.reset_item() - self.parsed += 1 - else: - assert self.app.args - # don't muddy the water if just a dry-run: - if not self.app.args.dry_run: - self.app.incr_stories("bad", self.link) - self.bad += 1 - self.in_item = False - - class RSSQueuer(Queuer): HANDLE_GZIP = True # transparently handle .gz files def __init__(self, process_name: str, descr: str): super().__init__(process_name, descr) - - self.sample_size: Optional[int] = None - self.dry_run = False + self.reset_item(True) + + def reset_item(self, first: bool = False) -> None: + if first: + self.ok = self.bad = 0 + self.link: str | None = "" + self.domain: str | None = "" + self.pub_date: str | None = "" + self.title: str | None = "" + self.source_url: str | None = None + self.source_feed_id: int | None = None + self.source_source_id: int | None = None def define_options(self, ap: argparse.ArgumentParser) -> None: super().define_options(ap) @@ -209,15 +127,85 @@ def add_previous(days: int) -> None: for days in range(1, args.days + 1): add_previous(days) + def end_item(self, fname: str) -> None: + # domain not required by queue-based fetcher + if self.link: + s = Story() + # mypy reval_type(rss) in "with s.rss_entry() as rss" gives Any!! + rss = s.rss_entry() + with rss: + rss.link = self.link + rss.domain = self.domain + rss.pub_date = self.pub_date + rss.title = self.title + rss.source_url = self.source_url + rss.source_feed_id = self.source_feed_id + rss.source_source_id = self.source_source_id + rss.via = fname # instead of fetch_date + self.send_story(s) + self.ok += 1 + else: + assert self.args + # don't muddy the water if just a dry-run: + if not self.args.dry_run: + self.incr_stories("bad", self.link or "no-link") + self.bad += 1 + self.reset_item() + def process_file(self, fname: str, fobj: BinaryIO) -> None: """ - called for each file/url on command line, and those - implied by --fetch-date, --days and --yesterday + called for each file/url on command line, + each file in a directory on the command line, + each S3 object matching an s3 URL prefix, + and URLs implied by --fetch-date, --days and --yesterday with an uncompressed (binary) byte stream """ - handler = RSSHandler(self, fname) - xml.sax.parse(fobj, handler) - logger.info("processed %s: %d ok, %d bad", fname, handler.parsed, handler.bad) + path: list[str] = [] + self.reset_item(True) + + # recover=True avoids fatal error when control characters seen in title + for event, element in iterparse(fobj, events=("start", "end"), recover=True): + name = element.tag + logger.debug("%s tag %s level %d", event, name, len(path)) + if event == "start": + path.append(name) + else: # event == "end" + path.pop() + if name == "item": + self.end_item(fname) + continue + + lpath = len(path) + if ( + (lpath > 0 and path[0] != "rss") + or (lpath > 1 and path[1] != "channel") + or (lpath > 2 and path[2] != "item") + or lpath > 3 + ): + logger.warning( + "%s: unexpected tag %s path %s", + name, + "/".join(path), + ) + continue + + # here at end of a tag inside an + content = element.text or "" + assert isinstance(content, str) + if name == "link": + # undo HTML entity escapes: + self.link = html.unescape(content).strip() + elif name == "domain": + self.domain = content.strip() or None + elif name == "pubDate": + self.pub_date = content.strip() or None + elif name == "title": + self.title = content.strip() or None + elif name == "source": + self.source_url = element.get("url") + self.source_feed_id = optional_int(element.get("mcFeedId")) + self.source_source_id = optional_int(element.get("mcSourceId")) + logger.info("processed %s: %d ok, %d bad", fname, self.ok, self.bad) if __name__ == "__main__": diff --git a/indexer/workers/fetcher/sched.py b/indexer/workers/fetcher/sched.py index 95919970..0f7c0d3a 100644 --- a/indexer/workers/fetcher/sched.py +++ b/indexer/workers/fetcher/sched.py @@ -14,35 +14,19 @@ from enum import Enum from typing import Any, Callable, Dict, List, NamedTuple, NoReturn, Optional, Type -from indexer.app import App - # number of seconds after start of last request to keep idle slot around -# (maintains request RTT) +# if no active/delayed requests and no errors (maintains request RTT) SLOT_RECENT_MINUTES = 5 -# exponential moving average coefficient for avg_seconds. -# (typ. used by TCP for RTT est) -# https://en.wikipedia.org/wiki/Exponential_smoothing -ALPHA = 0.25 +# exponentially decayed moving average coefficient for avg_seconds. +# (used in TCP for RTT averaging, and in system load averages) +# see https://en.wikipedia.org/wiki/Exponential_smoothing +# Scrapy essentially uses 0.5: (old_avg + sample) / 2 +ALPHA = 0.25 # applied to new samples logger = logging.getLogger(__name__) -# _could_ try and map Slots by IP address(es), since THAT gets us closer -# to the point (of not hammering a particular server); -# -# BUT: Would have to deal with: -# 1. A particular FQDN may map to multiple IP addrs -# 2. The order of the IP addreses often changes between queries -# (so doing a lookup first, then connecting by name -# will likely give different first answers) -# 3. The ENTIRE SET might change if a CDN is involved! -# 4. Whether or not we're using IPv6 (if not, can ignore IPv6) -# 5. IP addresses can serve multiple domains -# 6. But domains served by shared servers (see #5) -# might have disjoint IP addr sets. - - class LockError(RuntimeError): """ base class for locking exceptions @@ -158,14 +142,39 @@ def __str__(self) -> str: return f"{self.elapsed():.3f}" -class IssueStatus(Enum): +class Alarm: """ - return value from Slot._issue + time until a future event (born expired) """ - OK = 0 # slot assigned - BUSY = 1 # too many fetches active or too soon - SKIPPED = 2 # recent connection error + def __init__(self) -> None: + self.time = 0.0 + + def set(self, delay: float) -> None: + """ + if unexpired, extend expiration by delay seconds; + else set expiration to delay seconds in future + """ + now = time.monotonic() + if self.time > now: + self.time += delay + else: + self.time = now + delay + + def delay(self) -> float: + """ + return seconds until alarm expires + """ + return self.time - time.monotonic() + + def __str__(self) -> str: + """ + used in log messages! + """ + delay = self.delay() + if delay >= 0: + return "%.2f" % delay + return "READY" class ConnStatus(Enum): @@ -179,6 +188,20 @@ class ConnStatus(Enum): DATA = 1 +class StartStatus(Enum): + """ + status from start() + """ + + OK = 1 # ok to start + SKIP = 2 # recently reported down + BUSY = 3 # currently busy + + +DELAY_SKIP = -1.0 # recent attempt failed +DELAY_LONG = -2.0 # delay to long to hold + + class Slot: """ A slot for a single id (eg domain) within a ScoreBoard @@ -188,97 +211,139 @@ def __init__(self, slot_id: str, sb: "ScoreBoard"): self.slot_id = slot_id # ie; domain self.sb = sb - self.active_count = 0 - self.last_issue = Timer(SLOT_RECENT_MINUTES * 60) - # time since last error at this workplace + # time since last error at this workplace: self.last_conn_error = Timer(sb.conn_retry_seconds) + self.avg_seconds = 0.0 # smoothed average - self.issue_interval = 0.0 + self.issue_interval = sb.min_interval_seconds + + self.last_start = Timer(SLOT_RECENT_MINUTES) + + self.next_issue = Alarm() + self.delayed = 0 + self.active = 0 # O(n) removal, only used for debug_info # unclear if using a Set would be better or not... self.active_threads: List[str] = [] - def _issue(self) -> IssueStatus: + def _get_delay(self) -> float: """ - return True if safe to issue (must call "retire" after) - return False if cannot be issued now + return delay in seconds until fetch can begin. + or value < 0 (DELAY_{SKIP,LONG}) """ + # NOTE! Lock held: avoid logging! self.sb.big_lock.assert_held() - # scrapy issue interval is avg_latency / concurrency - # goal is to keep "concurrency" requests active - if self.avg_seconds == 0: # no running average yet. - # issue up to concurrency limit requests: - if self.active_count >= self.sb.target_concurrency: - return IssueStatus.BUSY - else: # have running average of request times. - elapsed = self.last_issue.elapsed() - if elapsed < self.issue_interval: - # WISH: return delta, for second try sleep time?? - return IssueStatus.BUSY + # see if connection to domain failed "recently". + if not self.last_conn_error.expired(): + return DELAY_SKIP + + delay = self.next_issue.delay() + if delay > self.sb.max_delay_seconds: + return DELAY_LONG + + if delay < 0: + # this site/slot clear to issue: consider some sort of + # rate limit here to avoid filling up _message_queue when + # incomming requests are well mixed (100 requests to 100 + # sites will go right to the message queue)! Could have a + # scoreboard wide "next_issue" clock?? + delay = 0.0 # never issued or past due + + self.next_issue.set(self.issue_interval) + self.delayed += 1 + self.sb.delayed += 1 + return delay + + def _start(self) -> StartStatus: + """ + Here in a worker thread, convert from delayed to active + returns False if connection failed recently + """ + # NOTE! Lock held: avoid logging! + self.sb.big_lock.assert_held() + + self.delayed -= 1 + self.sb.delayed -= 1 # see if connection to domain failed "recently". # last test so that preference is short delay # (and hope an active fetch succeeds). if not self.last_conn_error.expired(): - return IssueStatus.SKIPPED + return StartStatus.SKIP - self.active_count += 1 - self.last_issue.reset() + if self.active >= self.sb.target_concurrency: + return StartStatus.BUSY + + self.active += 1 + self.last_start.reset() self.active_threads.append(threading.current_thread().name) - return IssueStatus.OK + return StartStatus.OK - def retire(self, conn_status: ConnStatus, sec: float) -> None: + def finish(self, conn_status: ConnStatus, sec: float) -> None: """ called when a fetch attempt has ended. """ with self.sb.big_lock: # NOTE! Avoid logging while holding lock!!! - assert self.active_count > 0 - self.active_count -= 1 - # remove on list is O(n), but n is small (concurrency limit) + assert self.active > 0 + self.active -= 1 + # remove on list is O(n), but n is small (concurrency target) self.active_threads.remove(threading.current_thread().name) oavg = self.avg_seconds if conn_status == ConnStatus.NOCONN: self.last_conn_error.reset() + # discard avg connection time estimate: + self.avg_seconds = 0 elif conn_status == ConnStatus.DATA: if self.avg_seconds == 0: self.avg_seconds = sec else: - # exponentially moving average (typ. used by TCP for RTT est) - # https://en.wikipedia.org/wiki/Exponential_smoothing - self.avg_seconds += (sec - self.avg_seconds) * ALPHA + if sec > self.avg_seconds: + # per scrapy: adopt larger values directly + self.avg_seconds = sec + else: + # exponentially decayed moving average + # see comments on ALPHA declaration above. + self.avg_seconds += (sec - self.avg_seconds) * ALPHA + + # note: the above is a simplification of: + # avg = ALPHA * new + BETA * avg + # where ALPHA + BETA == 1.0, or BETA = 1.0 - ALPHA + # => avg = new * ALPHA + (1 - ALPHA) * avg + # => avg = new * ALPHA + avg - avg * ALPHA + # => avg = (new - avg) * ALPHA + avg + # => avg += (new - avg) * ALPHA elif conn_status == ConnStatus.NODATA: # got connection but no data # better to have some estimate of connection average time than none if self.avg_seconds == 0: self.avg_seconds = sec if self.avg_seconds != oavg: - self.issue_interval = self.avg_seconds / self.sb.target_concurrency + # average success time has changed, update issue interval + interval = self.avg_seconds / self.sb.target_concurrency + if interval < self.sb.min_interval_seconds: + interval = self.sb.min_interval_seconds + self.issue_interval = interval # adjust scoreboard counters - self.sb._slot_retired(self.active_count == 0) + self.sb._slot_finished(self.active == 0) def _consider_removing(self) -> None: self.sb.big_lock.assert_held() # PARANOIA if ( - self.active_count == 0 - and self.last_issue.expired() + self.active == 0 + and self.delayed == 0 + and self.last_start.expired() and self.last_conn_error.expired() ): self.sb._remove_slot(self.slot_id) -# status/value tuple: popular in GoLang -class IssueReturn(NamedTuple): - status: IssueStatus - slot: Optional[Slot] # if status == OK - - TS_IDLE = "idle" @@ -287,19 +352,41 @@ class ThreadStatus: ts: float # time.monotonic +class StartRet(NamedTuple): + """ + return value from start() + """ + + status: StartStatus + slot: Optional[Slot] + + +class Stats(NamedTuple): + """ + statistics returned by periodic() + """ + + slots: int + active_fetches: int + active_slots: int + delayed: int + + class ScoreBoard: """ keep score of active requests by "id" (str) """ + # arguments changed often in development, + # so all must be passed by keyword def __init__( self, - app: App, # for stats - max_active: int, # total concurrent active limit + *, target_concurrency: int, # max active with same id (domain) + max_delay_seconds: float, # max time to hold conn_retry_seconds: float, # seconds before connect retry + min_interval_seconds: float, ): - self.app = app # single lock, rather than one each for scoreboard, active count, # and each slot. Time spent with lock held should be small, # and lock ordering issues likely to make code complex and fragile! @@ -307,12 +394,15 @@ def __init__( self.big_lock = Lock( "big_lock", self.debug_info_nolock ) # covers ALL variables! - self.max_active = max_active self.target_concurrency = target_concurrency + self.max_delay_seconds = max_delay_seconds self.conn_retry_seconds = conn_retry_seconds + self.min_interval_seconds = min_interval_seconds + self.slots: Dict[str, Slot] = {} # map "id" (domain) to Slot - self.active_fetches = 0 self.active_slots = 0 + self.active_fetches = 0 + self.delayed = 0 # map thread name to ThreadStatus self.thread_status: Dict[str, ThreadStatus] = {} @@ -330,25 +420,35 @@ def _get_slot(self, slot_id: str) -> Slot: def _remove_slot(self, slot_id: str) -> None: del self.slots[slot_id] - def issue(self, slot_id: str, note: str) -> IssueReturn: + def get_delay(self, slot_id: str) -> float: + """ + called when story first picked up from message queue. + return time to hold message before starting (delayed counts incremented) + if too long (more than max_delay_seconds), returns -1 + """ with self.big_lock: - if self.active_fetches < self.max_active: - slot = self._get_slot(slot_id) - status = slot._issue() - if status == IssueStatus.OK: - # *MUST* call slot.retire() when done - if slot.active_count == 1: # was idle - self.active_slots += 1 - self.active_fetches += 1 - self._set_thread_status(note) # full URL - return IssueReturn(status, slot) - else: - status = IssueStatus.BUSY - return IssueReturn(status, None) + slot = self._get_slot(slot_id) + return slot._get_delay() - def _slot_retired(self, slot_idle: bool) -> None: + def start(self, slot_id: str, note: str) -> StartRet: + """ + here from worker thread, after delay (if any) + """ + with self.big_lock: + slot = self._get_slot(slot_id) + status = slot._start() + if status == StartStatus.OK: + # *MUST* call slot.finished() when done + if slot.active == 1: # was idle + self.active_slots += 1 + self.active_fetches += 1 + self._set_thread_status(note) # full URL + return StartRet(status, slot) + return StartRet(status, None) + + def _slot_finished(self, slot_idle: bool) -> None: """ - here from slot.retired() + here from slot.finished() """ # NOTE! lock held: avoid logging self.big_lock.assert_held() @@ -357,7 +457,6 @@ def _slot_retired(self, slot_idle: bool) -> None: if slot_idle: assert self.active_slots > 0 self.active_slots -= 1 - # XXX _consider_removing self._set_thread_status(TS_IDLE) def _set_thread_status(self, info: str) -> None: @@ -372,33 +471,26 @@ def _set_thread_status(self, info: str) -> None: ts.info = info ts.ts = time.monotonic() - def periodic(self, dump_slots: bool = False) -> None: + def periodic(self, dump_slots: bool = False) -> Stats: """ - called periodically from main thread + called periodically from main thread. + NOTE!! dump_slots logs with lock held!!!! + Use only for debug! """ with self.big_lock: # do this less frequently? for slot in list(self.slots.values()): slot._consider_removing() - # avoid stats, logging with lock held!!! - recent = len(self.slots) - active_fetches = self.active_fetches - active_slots = self.active_slots - - if dump_slots: + if dump_slots: # NOTE!!! logs with lock held!!! self.debug_info_nolock() - logger.info( - "%d recently active; %d URLs in %d domains active", - recent, - active_fetches, - active_slots, - ) - - self.app.gauge("active.recent", recent) - self.app.gauge("active.fetches", active_fetches) - self.app.gauge("active.slots", active_slots) + return Stats( + slots=len(self.slots), + active_fetches=self.active_fetches, + active_slots=self.active_slots, + delayed=self.delayed, + ) def debug_info_nolock(self) -> None: """ @@ -407,11 +499,15 @@ def debug_info_nolock(self) -> None: """ for domain, slot in list(self.slots.items()): logger.info( - "%s: %s last issue: %s last err: %s", + "%s: %s avg %.3f, %da, %dd, last issue: %s last err: %s next: %s", domain, ",".join(slot.active_threads), - slot.last_issue, + slot.avg_seconds, + slot.active, + slot.delayed, + slot.last_start, slot.last_conn_error, + slot.next_issue, ) # here without lock, so grab before examining: diff --git a/indexer/workers/fetcher/tqfetcher.py b/indexer/workers/fetcher/tqfetcher.py index 9f18a2e8..c779133f 100644 --- a/indexer/workers/fetcher/tqfetcher.py +++ b/indexer/workers/fetcher/tqfetcher.py @@ -11,33 +11,29 @@ be waiting for I/O (due to network/server latency), or CPU bound in SSL processing, neither of which requires holding the GIL. -When a Story can't be fetched because of connect rate or concurrency -limits, the Story is queued to a "fast delay" queue to avoid -bookkeeping complexity (and having an API that allows not ACKing a -message immediately). - -In theory we have thirty minutes to ACK a message before RabbitMQ has -a fit (closes connection), so holding on to Stories that can't be -processed immediately is POSSIBLE, *BUT* the existing framework acks -the story after process_message/story (in _process_messages) and -handles exceptions for retry and quarantine. - -Some thoughts on holding messages (2023-02-05): -* keep absolute "next_issue_time" in Slot -* assign incomming stories an absolute issue time based on next_issue_time -* if next_issue_time too far in the future (over 20 minutes), requeue. -* else call pika_connection.call_later w/ the entire InputMessage & callback -* increment Slot.next_issue_time by Slot.issue_interval -* callback queues InputMessage to work queue -* how to prevent starvation/stalling (hitting prefetch limit) caused by big sources/long delays? - + limit held stories per slot (calculate via (next_issue - now)/issue_interval???)? - + calculate limit as fraction of worker pool/prefetch size??? +We have thirty minutes to ACK a message before RabbitMQ has a fit +(closes connection), so: + * All scheduling done in Pika thread, as messages delivered by Pika + * As messages come to _on_input_message, the next time a fetch could + be issued is assigned by calling scoreboard.get_delay + * If the delay would mean the fetch would start more than BUSY_DELAY_MINUTES + in the future, the message is requeued to the "-fast" delay queue + (and will return in BUSY_DELAY_MINUTES). + * If connections to the server have failed "recently", behave as if + this connection failed, and requeue the story for retry. + * Else call pika_connection.call_later w/ the entire InputMessage and + a callback to queue the InputMessage to the work queue (_message_queue) + and the InputMessage will be picked up by a worker thread and passed + to process_story() """ +# To find all stories_incr label names: +# egrep 'FetchReturn\(|GetIdReturn\(|incr_stor' tqfetcher.py + import argparse import logging -import random +import os import signal import sys import time @@ -46,6 +42,7 @@ import requests from mcmetadata.webpages import MEDIA_CLOUD_USER_AGENT +from pika.adapters.blocking_connection import BlockingChannel from requests.exceptions import RequestException from indexer.app import run @@ -56,27 +53,28 @@ non_news_fqdn, url_fqdn, ) -from indexer.worker import QuarantineException, RequeueException +from indexer.worker import InputMessage, QuarantineException, RequeueException from indexer.workers.fetcher.sched import ( + DELAY_LONG, + DELAY_SKIP, ConnStatus, - IssueReturn, - IssueStatus, ScoreBoard, + StartStatus, ) TARGET_CONCURRENCY = 10 # scrapy fetcher AUTOTHROTTLE_TARGET_CONCURRENCY -# Unless the input RSS entries are well mixed (and this would not be -# the case if the rss-fetcher queued RSS entries to us directly), RSS -# entries for the same domain will travel in packs/clumps/trains. If -# the "fast" delay is too long, that allows only one URL to be issued -# each time the train passes. So set the "fast" delay JUST long -# enough so they come back when intra-request issue interval has -# passed. Note: the intra-request interval is based on response time -# divided by SLOT_REQUESTS, so is not a constant. -BUSY_DELAY_SECONDS = 30.0 - -# time cache server as bad after a connection failure +# minimum interval between initiation of requests to a site +# lower values increase chance of concurrent connections to +# sites that respond quickly. +MIN_INTERVAL_SECONDS = 0.5 + +# default delay time for "fast" queue, and max time to delay stories +# w/ call_later. Large values allow more requests to be delayed, so +# keeping it small, hopefully breaking up clumps. +BUSY_DELAY_MINUTES = 2 + +# time to cache server as down after a connection failure CONN_RETRY_MINUTES = 10 # requests timeouts: @@ -127,9 +125,13 @@ class FetchReturn(NamedTuple): quarantine: bool -class Fetcher(MultiThreadStoryWorker): - WORKER_THREADS_DEFAULT = 200 # equiv to 20 fetchers, with 10 active fetches +class GetIdReturn(NamedTuple): + status: str # counter name if != "ok" + url: str + id: str + +class Fetcher(MultiThreadStoryWorker): # Exceptions to discard instead of quarantine after repeated retries: # RequestException hierarchy includes bad URLs NO_QUARANTINE = (Retry, RequestException) @@ -138,15 +140,15 @@ def __init__(self, process_name: str, descr: str): super().__init__(process_name, descr) self.scoreboard: Optional[ScoreBoard] = None - self.previous_fragment = "" + self.prefetch = 0 def define_options(self, ap: argparse.ArgumentParser) -> None: super().define_options(ap) ap.add_argument( - "--busy-delay-seconds", + "--busy-delay-minutes", type=float, - default=BUSY_DELAY_SECONDS, - help=f"busy (fast) queue delay in seconds (default: {BUSY_DELAY_SECONDS})", + default=BUSY_DELAY_MINUTES, + help=f"busy (fast) queue delay in minutes (default: {BUSY_DELAY_MINUTES})", ) ap.add_argument( @@ -156,6 +158,13 @@ def define_options(self, ap: argparse.ArgumentParser) -> None: help=f"minutes to cache connection failure (default: {CONN_RETRY_MINUTES})", ) + ap.add_argument( + "--min-interval-seconds", + type=float, + default=MIN_INTERVAL_SECONDS, + help=f"minimum connection interval in seconds (default: {MIN_INTERVAL_SECONDS})", + ) + ap.add_argument( "--target-concurrency", type=int, @@ -163,13 +172,6 @@ def define_options(self, ap: argparse.ArgumentParser) -> None: help=f"goal for concurrent requests/fqdn (default: {TARGET_CONCURRENCY})", ) - ap.add_argument( - "--no-second-try", - default=False, - action="store_true", - help="don't try sleep/retry after busy", - ) - ap.add_argument( "--dump-slots", default=False, @@ -188,16 +190,15 @@ def process_args(self) -> None: ) sys.exit(1) + self.busy_delay_seconds = self.args.busy_delay_minutes * 60 + self.scoreboard = ScoreBoard( - self, - self.workers, - self.args.target_concurrency, - self.args.conn_retry_minutes * 60, + target_concurrency=self.args.target_concurrency, + max_delay_seconds=self.busy_delay_seconds, + conn_retry_seconds=self.args.conn_retry_minutes * 60, + min_interval_seconds=self.args.min_interval_seconds, ) - self.busy_delay_seconds = self.args.busy_delay_seconds - self.second_try = not self.args.no_second_try - self.set_requeue_delay_ms(1000 * self.busy_delay_seconds) # enable debug dump on SIGUSR1 @@ -207,6 +208,26 @@ def usr1_handler(sig: int, frame: Optional[FrameType]) -> None: signal.signal(signal.SIGUSR1, usr1_handler) + def _qos(self, chan: BlockingChannel) -> None: + """ + set "prefetch" limit, the number of unacked messages + RabbitMQ will send us at any time. + + Active requests, ready messages waiting in _message_queue, + and delayed (call_later) should total to the prefetch limit. + + NOTE!!! Failure to send an ACK to RabbitMQ for + CONSUMER_TIMEOUT_SECONDS for ANY message will cause + RabbitMQ to close the connection!!! So the estimate + MUST be prssimistic!! + """ + # Want to avoid very large numbers of requests in the "ready" + # state (in message queue), since there is no inter-request + # delay enforced once requests land there. + self.prefetch = self.workers * 2 + logger.info("prefetch %d", self.prefetch) + chan.basic_qos(prefetch_count=self.prefetch) + def periodic(self) -> None: """ called from main_loop @@ -215,9 +236,40 @@ def periodic(self) -> None: assert self.args with self.timer("status"): - self.scoreboard.periodic(self.args.dump_slots) + stats = self.scoreboard.periodic(self.args.dump_slots) + + ready = self.message_queue_len() # ready for workers + # delayed counts not adjusted until "start" called, + # so subtract messages in message_queue: + delayed = stats.delayed - ready + + load_avgs = os.getloadavg() + + # when input queue non-empty, first three should total to self.prefetch + logger.info( + "%d active, %d ready, %d delayed, for %d sites, %d recent; lavg %.2f", + stats.active_fetches, + ready, + delayed, + stats.active_slots, + stats.slots, + load_avgs[0], + ) + + def requests(label: str, count: int) -> None: + self.gauge("requests", count, labels=[("status", label)]) + + requests("active", stats.active_fetches) + requests("ready", ready) + requests("delayed", delayed) + + # above three should total to prefetch: + self.gauge("prefetch", self.prefetch) - def fetch(self, sess: requests.Session, fqdn: str, url: str) -> FetchReturn: + self.gauge("slots.recent", stats.slots) + self.gauge("slots.active", stats.active_slots) + + def fetch(self, sess: requests.Session, url: str) -> FetchReturn: """ perform HTTP get, tracking redirects looking for non-news domains @@ -250,8 +302,8 @@ def fetch(self, sess: requests.Session, fqdn: str, url: str) -> FetchReturn: # here with redirect: nextreq = resp.next # PreparedRequest | None if nextreq: - url = prepreq.url or "" prepreq = nextreq + url = prepreq.url or "" else: url = "" @@ -275,88 +327,134 @@ def fetch(self, sess: requests.Session, fqdn: str, url: str) -> FetchReturn: # end infinite redirect loop - def try_issue(self, id: str, url: str) -> IssueReturn: - # report time to issue: if this jumps up, it's - # likely due to lock contention! - assert self.scoreboard - with self.timer("issue"): - return self.scoreboard.issue(id, url) - - # OOF! flake8 complains "C901 'Fetcher.process_story' is too complex (19)" - # flake8: noqa: C901 - def process_story(self, sender: StorySender, story: BaseStory) -> None: + def get_id(self, story: BaseStory) -> GetIdReturn: """ - called from multiple worker threads!! - - This routine should call incr_stories EXACTLY once! + This function determines what stories are treated as from + the same "server". + + NOT using "domain" from RSS file because I originally + was planning to move the "issue" call inside the + redirect loop (getting clearance for each FQDN along the + chain), but if we ended up with a "busy", we'd have to + retry and start ALL over, or add a field to the Story + indicating the "next URL" to attempt to fetch, along + with a count of followed redirects. AND, using + "canonical" domain means EVERYTHING inside a domain + looks to be one server (when that may not be the case). + + *COULD* look up addresses, sort them, and pick the lowest or + highest?! this would avoid hitting single servers that handle + many thing.dom.ain names hard, but incurrs overhead (and + unless the id is stashed in the story object would require + multiple DNS lookups: initial Pika thread dispatch, in worker + thread for "start" call, and again for actual connection. + Hopefully the result is cached nearby, but it would still incurr + latency for due to system calls, network delay etc. """ rss = story.rss_entry() url = rss.link if not url: - return self.incr_stories("no-url", repr(url)) - assert isinstance(url, str) - assert self.scoreboard is not None + return GetIdReturn("no-url", repr(url), "bad") - # NOTE!! fqdn isn't QUITE right: it means every - # foobar.blogspot.com is treated as a separate server. - # Really want to use IP address (see sched.py for more - # gory details), but that would require resolving the - # FQDN, and *THEN* using that address to make the HTTP - # connection *AND* subsequent redirect fetches (if the - # FQDN stays the same). - - # NOT using "domain" from RSS file because I originally - # was planning to move the "issue" call inside the - # redirect loop (getting clearance for each FQDN along the - # chain), but if we ended up with a "busy", we'd have to - # retry and start ALL over, or add a field to the Story - # indicating the "next URL" to attempt to fetch, along - # with a count of followed redirects. AND, using - # "canonical" domain means EVERYTHING inside a domain - # looks to be one server (when that may not be the case). + assert isinstance(url, str) # BEFORE issue (discard without locking/delay) try: fqdn = url_fqdn(url) except (TypeError, ValueError): - return self.incr_stories("badurl1", url) + return GetIdReturn("badurl1", url, fqdn) if non_news_fqdn(fqdn): # unlikely, if queuer does their job! - return self.incr_stories("non-news", url) - - ir = self.try_issue(fqdn, url) - if ir.status == IssueStatus.BUSY and self.second_try: - # Not inside "issue" to avoid messing up timing stats. - # All messages in the fast queue get the same delay, so - # introduce some randomness here, AND check a second time - # (improves output rate). With low RTT and high - # target_concurreny, most slots have an issue_interval - # under a second. - time.sleep(random.random()) - ir = self.try_issue(fqdn, url) - - if ir.slot is None: # could not be issued - if ir.status == IssueStatus.SKIPPED: - # Skipped due to recent connection error: Treat as if - # we saw an error as well (incrementing retry count on - # the Story) rather than possibly waiting 30 seconds - # for connection to time out again. After a failure - # the scheduler remembers the slot as failing for - # conn_retry_minutes - self.incr_stories("skipped", url) + return GetIdReturn("non-news", url, fqdn) + + return GetIdReturn("ok", url, fqdn) + + def _on_input_message(self, im: InputMessage) -> None: + """ + YIKES!! override a basic Worker method!!! + Performs an additional decode of serialized Story! + NOTE! Not covered by exception catching for retry!!! + MUST ack and commit before returning!!! + + pre-processes incomming stories, delaying them + (using the Pika "channel.call_later" method) + so that they're queued to the worker pool + with suitable inter-request delays for each server. + + DOES NOT INCREMENT STORY COUNTER!!! + (perhaps have a different counter??) + """ + assert self.scoreboard is not None + assert self.connection is not None + + try: + story = self.decode_story(im) + + status, url, id = self.get_id(story) + if status != "ok": + self.incr_stories(status, url) + self._pika_ack_and_commit(im) # drop (ack without requeuing) + return + + with self.timer("get_delay"): + delay = self.scoreboard.get_delay(id) + + logger.info("%s: delay %.3f", url, delay) + if delay >= 0: + # NOTE! Using pika connection.call_later because it's available. + # "put" does not need to be run in the Pika thread, and the + # delay _could_ be managed in another thread. + def put() -> None: + # _put_message queue is the normal "_on_input_message" handler + logger.debug("put #%s", im.method.delivery_tag) + self._put_message_queue(im) + + # holding message, will be acked when processed + if delay == 0: + # enforce SOME kind of rate limit? + # see comments in Slot._get_delay() + put() + else: + logger.debug("delay #%s", im.method.delivery_tag) + self.connection.call_later(delay, put) + return + elif delay == DELAY_SKIP: raise Retry("skipped due to recent connection failure") + elif delay == DELAY_LONG: + self._requeue(im) else: - # here when "busy", due to one of (in order of likelihood): - # 1. per-fqdn connect interval not reached - # 2. per-fqdn currency limit reached - # 3. total concurrecy limit reached. - # requeue in short-delay queue, without counting as retry. - self.incr_stories("busy", url, log_level=logging.DEBUG) - raise RequeueException("busy") # does not increment retry count - - # ***NOTE*** here with slot marked active *MUST* call ir.slot.retire!!!! + raise Retry(f"unknown delay {delay}") + except Exception as exc: + self._retry(im, exc) + self._pika_ack_and_commit(im) + + def process_story(self, sender: StorySender, story: BaseStory) -> None: + """ + called in a worker thread + retry/quarantine exceptions handled normally + """ + istatus, url, id = self.get_id(story) + if istatus != "ok": + logger.warning("get_id returned ('%s', '%s')", istatus, id) + self.incr_stories(istatus, id) + return + + assert self.scoreboard is not None + start_status, slot = self.scoreboard.start(id, url) + if start_status == StartStatus.SKIP: + self.incr_stories("skipped2", url) + raise Retry("skipped due to recent connection failure") + elif start_status == StartStatus.BUSY: + self.incr_stories("busy", url) + raise RequeueException("busy") + elif start_status != StartStatus.OK: + logger.warning("start status %s: %s", start_status, url) + raise Retry(f"start status {start_status}") + assert slot is not None + + # ***NOTE*** here with slot marked active *MUST* call slot.finish!!!! t0 = time.monotonic() with self.timer("fetch"): # log starting URL @@ -364,7 +462,7 @@ def process_story(self, sender: StorySender, story: BaseStory) -> None: sess = requests.Session() try: # call retire on exit - fret = self.fetch(sess, fqdn, url) + fret = self.fetch(sess, url) if fret.resp and fret.resp.status_code == 200: conn_status = ConnStatus.DATA else: @@ -385,9 +483,10 @@ def process_story(self, sender: StorySender, story: BaseStory) -> None: raise # re-raised for retry counting finally: # ALWAYS: report slot now idle!! - with self.timer("retire"): + # jumps in timing indicate lock contention!! + with self.timer("finish"): # keep track of connection success, latency - ir.slot.retire(conn_status, time.monotonic() - t0) + slot.finish(conn_status, time.monotonic() - t0) sess.close() resp = fret.resp # requests.Response @@ -410,7 +509,6 @@ def process_story(self, sender: StorySender, story: BaseStory) -> None: raise Retry(msg) else: return self.incr_stories(counter, msg) - # here with status == 200 content = resp.content # bytes lcontent = len(content) diff --git a/pyproject.toml b/pyproject.toml index 204ebf59..2754e197 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,7 @@ dependencies = [ "boto3 ~= 1.28.44", "docker ~= 6.1.0", "elasticsearch ~= 8.12.0", + # lxml installed by some other package? "mediacloud-metadata ~= 0.12.0", "pika ~= 1.3.2", "rabbitmq-admin ~= 0.2", @@ -23,8 +24,9 @@ dependencies = [ [project.optional-dependencies] dev = [ "boto3-stubs[s3] ~= 1.34.13", - "mypy ~= 1.5.1", "jinja2-cli ~= 0.8.2", + "lxml-stubs ~= 0.5.1", + "mypy ~= 1.5.1", "pre-commit ~= 3.4.0", "pytest ~= 7.4.2", "types-beautifulsoup4 ~= 4.12.0.20240106", diff --git a/requirements-dev.txt b/requirements-dev.txt index fe81bf62..9e1b0399 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -22,13 +22,13 @@ boilerpy3==1.0.7 # via mediacloud-metadata boto3==1.28.85 # via story-indexer (pyproject.toml) -boto3-stubs==1.34.54 +boto3-stubs==1.34.62 # via story-indexer (pyproject.toml) botocore==1.31.85 # via # boto3 # s3transfer -botocore-stubs==1.34.54 +botocore-stubs==1.34.62 # via boto3-stubs certifi==2024.2.2 # via @@ -146,13 +146,15 @@ lxml==4.9.4 # readability-lxml # scrapy # trafilatura +lxml-stubs==0.5.1 + # via story-indexer (pyproject.toml) markupsafe==2.1.5 # via jinja2 mediacloud-metadata==0.12.0 # via story-indexer (pyproject.toml) mypy==1.5.1 # via story-indexer (pyproject.toml) -mypy-boto3-s3==1.34.14 +mypy-boto3-s3==1.34.62 # via boto3-stubs mypy-extensions==1.0.0 # via mypy @@ -166,13 +168,13 @@ numpy==1.26.4 # via py3langid orderedmultidict==1.0.1 # via furl -packaging==23.2 +packaging==24.0 # via # docker # parsel # pytest # scrapy -parsel==1.8.1 +parsel==1.9.0 # via # itemloaders # scrapy @@ -204,7 +206,7 @@ pycparser==2.21 # via cffi pydispatcher==2.0.7 # via scrapy -pyopenssl==24.0.0 +pyopenssl==24.1.0 # via scrapy pytest==7.4.4 # via story-indexer (pyproject.toml) @@ -301,7 +303,7 @@ types-html5lib==1.1.11.20240228 # via types-beautifulsoup4 types-pika==1.2.0b1 # via story-indexer (pyproject.toml) -types-requests==2.31.0.20240218 +types-requests==2.31.0.20240311 # via story-indexer (pyproject.toml) types-s3transfer==0.10.0 # via boto3-stubs @@ -343,7 +345,7 @@ zope-interface==6.2 # twisted # The following packages are considered to be unsafe in a requirements file: -setuptools==69.1.1 +setuptools==69.2.0 # via # nodeenv # scrapy diff --git a/requirements.txt b/requirements.txt index 1394cb14..63fdf301 100644 --- a/requirements.txt +++ b/requirements.txt @@ -136,12 +136,12 @@ numpy==1.26.4 # via py3langid orderedmultidict==1.0.1 # via furl -packaging==23.2 +packaging==24.0 # via # docker # parsel # scrapy -parsel==1.8.1 +parsel==1.9.0 # via # itemloaders # scrapy @@ -167,7 +167,7 @@ pycparser==2.21 # via cffi pydispatcher==2.0.7 # via scrapy -pyopenssl==24.0.0 +pyopenssl==24.1.0 # via scrapy python-dateutil==2.9.0.post0 # via @@ -279,7 +279,7 @@ zope-interface==6.2 # twisted # The following packages are considered to be unsafe in a requirements file: -setuptools==69.1.1 +setuptools==69.2.0 # via # scrapy # supervisor