Skip to content

Commit

Permalink
bug-1911367: Remove processor heartbeat and process metrics (#6776)
Browse files Browse the repository at this point in the history
This removes processor heartbeat (and the scaffolding for that
heartbeat) which generated process metrics for the processor container.
We don't need those anymore--the problem those were added to help us
understand is long gone now.
  • Loading branch information
willkg authored Oct 31, 2024
1 parent 165fd98 commit fd89e5f
Show file tree
Hide file tree
Showing 5 changed files with 4 additions and 133 deletions.
25 changes: 3 additions & 22 deletions socorro/lib/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@

LOGGER = logging.getLogger(__name__)

HEARTBEAT_INTERVAL = 60


def default_task_func(a_param):
"""Default task function.
Expand All @@ -21,16 +19,6 @@ def default_task_func(a_param):
"""


def default_heartbeat():
"""Runs once a second from the main thread.
Note: If this raises an exception, it could kill the process or put it in a
weird state.
"""
LOGGER.info("THUMP")


def default_iterator():
"""Default iterator for tasks.
Expand Down Expand Up @@ -76,7 +64,6 @@ def __init__(
idle_delay=7,
quit_on_empty_queue=False,
job_source_iterator=default_iterator,
heartbeat_func=default_heartbeat,
task_func=default_task_func,
):
"""
Expand All @@ -88,14 +75,12 @@ def __init__(
instantiated with a config object can be iterated. The iterator must
yield a tuple consisting of a function's tuple of args and, optionally,
a mapping of kwargs. Ex: (('a', 17), {'x': 23})
:arg heartbeat_func: a function to run every second
:arg task_func: a function that will accept the args and kwargs yielded
by the job_source_iterator
"""
self.idle_delay = idle_delay
self.quit_on_empty_queue = quit_on_empty_queue
self.job_source_iterator = job_source_iterator
self.heartbeat_func = heartbeat_func
self.task_func = task_func

self._pid = os.getpid()
Expand All @@ -109,7 +94,7 @@ def _get_iterator(self):
job_source_iterator can be one of a few things:
* a class that can be instantiated and iterated over
* a function that returns an interator
* a function that returns an iterator
* an actual iterator/generator
* an iterable collection
Expand All @@ -124,15 +109,15 @@ def _get_iterator(self):
def _responsive_sleep(self, seconds, wait_log_interval=0, wait_reason=""):
"""Responsive sleep that checks for quit flag
When there is litte work to do, the queuing thread sleeps a lot. It can't sleep
When there is little work to do, the queuing thread sleeps a lot. It can't sleep
for too long without checking for the quit flag and/or logging about why it is
sleeping.
:arg seconds: the number of seconds to sleep
:arg wait_log_interval: while sleeping, it is helpful if the thread
periodically announces itself so that we know that it is still alive.
This number is the time in seconds between log entries.
:arg wait_reason: the is for the explaination of why the thread is
:arg wait_reason: the is for the explanation of why the thread is
sleeping. This is likely to be a message like: 'there is no work to do'.
This was also partially motivated by old versions' of Python inability to
Expand All @@ -146,14 +131,10 @@ def _responsive_sleep(self, seconds, wait_log_interval=0, wait_reason=""):

def blocking_start(self):
"""This function starts the task manager running to do tasks."""
next_heartbeat = time.time() + HEARTBEAT_INTERVAL
self.logger.debug("threadless start")
try:
# May never exhaust
for job_params in self._get_iterator():
if time.time() > next_heartbeat:
self.heartbeat_func()
next_heartbeat = time.time() + HEARTBEAT_INTERVAL
self.logger.debug("received %r", job_params)
if job_params is None:
if self.quit_on_empty_queue:
Expand Down
12 changes: 1 addition & 11 deletions socorro/lib/threaded_task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import time

from socorro.lib.task_manager import (
default_heartbeat,
default_iterator,
default_task_func,
TaskManager,
Expand All @@ -26,8 +25,6 @@

STOP_TOKEN = (None, None)

HEARTBEAT_INTERVAL = 60


class ThreadedTaskManager(TaskManager):
"""Threaded task manager."""
Expand All @@ -39,7 +36,6 @@ def __init__(
number_of_threads=4,
maximum_queue_size=8,
job_source_iterator=default_iterator,
heartbeat_func=default_heartbeat,
task_func=default_task_func,
):
"""
Expand All @@ -54,7 +50,6 @@ def __init__(
instantiated with a config object can be iterated. The iterator must
yield a tuple consisting of a function's tuple of args and, optionally,
a mapping of kwargs. Ex: (('a', 17), {'x': 23})
:arg heartbeat_func: a function to run every second
:arg task_func: a function that will accept the args and kwargs yielded
by the job_source_iterator
"""
Expand All @@ -71,7 +66,6 @@ def __init__(
idle_delay=idle_delay,
quit_on_empty_queue=quit_on_empty_queue,
job_source_iterator=job_source_iterator,
heartbeat_func=heartbeat_func,
task_func=task_func,
)
self.thread_list = [] # the thread object storage
Expand Down Expand Up @@ -107,12 +101,8 @@ def wait_for_completion(self):
if self.queueing_thread is None:
return

next_heartbeat = time.time() + HEARTBEAT_INTERVAL
self.logger.debug("waiting to join queueing_thread")
while True:
if time.time() > next_heartbeat:
self.heartbeat_func()
next_heartbeat = time.time() + HEARTBEAT_INTERVAL
try:
self.queueing_thread.join(1.0)
if not self.queueing_thread.is_alive():
Expand Down Expand Up @@ -149,7 +139,7 @@ def wait_for_empty_queue(self, wait_log_interval=0, wait_reason=""):
:arg wait_log_interval: While sleeping, it is helpful if the thread periodically
announces itself so that we know that it is still alive. This number is the
time in seconds between log entries.
:arg wait_reason: The is for the explaination of why the thread is sleeping.
:arg wait_reason: The is for the explanation of why the thread is sleeping.
This is likely to be a message like: 'there is no work to do'.
"""
Expand Down
61 changes: 0 additions & 61 deletions socorro/processor/processor_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

from fillmore.libsentry import set_up_sentry
from fillmore.scrubber import Scrubber, SCRUB_RULES_DEFAULT
import psutil
import sentry_sdk
from sentry_sdk.integrations.atexit import AtexitIntegration
from sentry_sdk.integrations.dedupe import DedupeIntegration
Expand Down Expand Up @@ -275,73 +274,13 @@ def _set_up_task_manager(self):
manager_settings.update(
{
"job_source_iterator": self.source_iterator,
"heartbeat_func": self.heartbeat,
"task_func": self.transform,
}
)
self.task_manager = build_instance(
class_path=manager_class, kwargs=manager_settings
)

def heartbeat(self):
"""Runs once a second from the main thread.
Note: If this raises an exception, it could kill the process or put it in a
weird state.
"""
try:
processes_by_type = {}
processes_by_status = {}
open_files = 0
for proc in psutil.process_iter(["cmdline", "status", "open_files"]):
try:
# NOTE(willkg): This is all intertwined with exactly how we run the
# processor in a Docker container. If we ever make changes to that, this
# will change, too. However, even if we never update this, seeing
# "zombie" and "orphaned" as process statuses or seeing lots of
# processes as a type will be really fishy and suggestive that evil is a
# foot.
cmdline = proc.cmdline() or ["unknown"]

if cmdline[0] in ["/bin/sh", "/bin/bash"]:
proc_type = "shell"
elif cmdline[0] in ["python", "/usr/local/bin/python"]:
proc_type = "python"
elif "stackwalk" in cmdline[0]:
proc_type = "stackwalker"
else:
proc_type = "other"

open_files_count = len(proc.open_files())
proc_status = proc.status()

except psutil.Error:
# For any psutil error, we want to track that we saw a process, but
# the details don't matter
proc_type = "unknown"
proc_status = "unknown"
open_files_count = 0

processes_by_type[proc_type] = processes_by_type.get(proc_type, 0) + 1
processes_by_status[proc_status] = (
processes_by_status.get(proc_status, 0) + 1
)
open_files += open_files_count

METRICS.gauge("processor.open_files", open_files)
for proc_type, val in processes_by_type.items():
METRICS.gauge(
"processor.processes_by_type", val, tags=[f"proctype:{proc_type}"]
)
for status, val in processes_by_status.items():
METRICS.gauge(
"processor.processes_by_status", val, tags=[f"procstatus:{status}"]
)

except Exception as exc:
sentry_sdk.capture_exception(exc)

def close(self):
"""Clean up the processor on shutdown."""
with suppress(AttributeError):
Expand Down
24 changes: 0 additions & 24 deletions socorro/statsd_metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,30 +144,6 @@ socorro.processor.minidumpstackwalk.run:
* ``outcome``: either ``success`` or ``fail``
* ``exitcode``: the exit code of the minidump stackwalk process
socorro.processor.open_files:
type: "gauge"
description: |
Gauge of currently open files for all processes running in the container.
socorro.processor.processes_by_type:
type: "gauge"
description: |
Gauge of processes by type.
Tags:
* ``proctype``: one of ``shell``, ``python``, ``stackwalker``, or ``other``
socorro.processor.processes_by_status:
type: "gauge"
description: |
Gauge of processes by process status.
Tags:
* ``procstatus``: one of ``running``, ``sleeping``, or other process
statuses.
socorro.processor.process_crash:
type: "timing"
description: |
Expand Down
15 changes: 0 additions & 15 deletions socorro/tests/processor/test_processor_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,6 @@ def test_source_iterator(self, processor_settings):
assert next(queue) is None
assert next(queue) == ((3,), {})

def test_heartbeat(self, sentry_helper):
"""Basic test to make sure it runs, captures metrics, and doesn't error out"""
with sentry_helper.reuse() as sentry_client:
with MetricsMock() as metricsmock:
app = ProcessorApp()
app.heartbeat()

# Assert it emitted some metrics
metricsmock.assert_gauge("socorro.processor.open_files")
metricsmock.assert_gauge("socorro.processor.processes_by_type")
metricsmock.assert_gauge("socorro.processor.processes_by_status")

# Assert it didn't throw an exception
assert len(sentry_client.envelopes) == 0

def test_transform_success(self, processor_settings):
app = ProcessorApp()
app._set_up_source_and_destination()
Expand Down

0 comments on commit fd89e5f

Please sign in to comment.