Skip to content

Commit

Permalink
bug-1898341, bug-1898345: add "host" tag, fix metrics key prefix
Browse files Browse the repository at this point in the history
This adds a `host` tag to emitted metrics when running in GCP. This is
derived from `HOSTNAME` if it exists, otherwise it defaults to
`socket.gethostname()` like our other services.

This changes Sentry and logging to use `HOSTNAME` configuration variable
rather than `HOST_ID`. This brings us in line with other services as we
migrate to GCP.

This also adds `"socorro"` prefix to all emitted keys, but only for the
GCP environments. This brings keys in line with our other services.

In order to do this, I had to create a singleton `METRICS` and then
rework everything to use that.
  • Loading branch information
willkg committed May 24, 2024
1 parent fa97274 commit a6c97ca
Show file tree
Hide file tree
Showing 27 changed files with 283 additions and 198 deletions.
10 changes: 5 additions & 5 deletions docker/config/gunicorn_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@

LOGGING_LEVEL = CONFIG("LOGGING_LEVEL", "INFO")
LOCAL_DEV_ENV = CONFIG("LOCAL_DEV_ENV", False, cast=bool)
HOST_ID = socket.gethostname()
HOSTNAME = CONFIG("HOSTNAME", default=socket.gethostname())


class AddHostID(logging.Filter):
class AddHostname(logging.Filter):
def filter(self, record):
record.host_id = HOST_ID
record.hostname = HOSTNAME
return True


logconfig_dict = {
"version": 1,
"disable_existing_loggers": False,
"filters": {"add_hostid": {"()": AddHostID}},
"filters": {"add_hostname": {"()": AddHostname}},
"handlers": {
"console": {
"level": LOGGING_LEVEL,
Expand All @@ -35,7 +35,7 @@ def filter(self, record):
"level": LOGGING_LEVEL,
"class": "logging.StreamHandler",
"formatter": "mozlog",
"filters": ["add_hostid"],
"filters": ["add_hostname"],
},
},
"formatters": {
Expand Down
2 changes: 1 addition & 1 deletion fakecollector/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def on_submit(self, request):
@click.option("--host", default="0.0.0.0", help="host to bind to")
@click.option("--port", default=8000, type=int, help="port to listen on")
def main(host, port):
set_up_logging(local_dev_env=True, host_id="localhost")
set_up_logging(local_dev_env=True, hostname="localhost")

from werkzeug.serving import run_simple
app = App()
Expand Down
4 changes: 0 additions & 4 deletions socorro/external/boto/crashstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import json
import logging

import markus

from socorro.external.crashstorage_base import (
CrashStorageBase,
CrashIDNotFound,
Expand Down Expand Up @@ -98,8 +96,6 @@ def __init__(
self.bucket = bucket
self.dump_file_suffix = dump_file_suffix

self.metrics = markus.get_metrics(metrics_prefix)

@classmethod
def build_connection(cls, region, access_key, secret_access_key, endpoint_url):
"""
Expand Down
6 changes: 4 additions & 2 deletions socorro/external/es/crashstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
is_indexable,
parse_mapping,
)

from socorro.libmarkus import METRICS, build_prefix
from socorro.lib.libdatetime import JsonDTEncoder, string_to_datetime, utc_now


Expand Down Expand Up @@ -293,7 +293,9 @@ def __init__(

self.client = self.build_client(url=url, timeout=timeout)

self.metrics = markus.get_metrics(metrics_prefix)
# Create a MetricsInterface that includes the base prefix plus the prefix passed
# into __init__
self.metrics = markus.get_metrics(build_prefix(METRICS.prefix, metrics_prefix))

self.index = index
self.index_regex = index_regex
Expand Down
3 changes: 0 additions & 3 deletions socorro/external/gcs/crashstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import json
import os

import markus
from google.auth.credentials import AnonymousCredentials
from google.api_core.exceptions import NotFound
from google.cloud import storage
Expand Down Expand Up @@ -94,8 +93,6 @@ def __init__(
self.bucket = bucket
self.dump_file_suffix = dump_file_suffix

self.metrics = markus.get_metrics(metrics_prefix)

def load_file(self, path):
bucket = self.client.bucket(self.bucket)
blob = bucket.blob(path)
Expand Down
23 changes: 15 additions & 8 deletions socorro/lib/liblogging.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,23 @@
def set_up_logging(
local_dev_env=False,
logging_level="INFO",
host_id=None,
hostname=None,
):
"""Initialize Python logging."""
"""Initialize Python logging.
if host_id is None:
host_id = socket.gethostname()
:arg local_dev_env: whether or not this is running in a local development
environment
:arg logging_level: the logging level to emit records at
:arg hostname: the hostname for this instance
class AddHostID(logging.Filter):
"""

if hostname is None:
hostname = socket.gethostname()

class AddHostname(logging.Filter):
def filter(self, record):
record.host_id = host_id
record.hostname = hostname
return True

class AddProcessName(logging.Filter):
Expand All @@ -34,7 +41,7 @@ def filter(self, record):
"version": 1,
"disable_existing_loggers": False,
"filters": {
"add_hostid": {"()": AddHostID},
"add_hostname": {"()": AddHostname},
"add_processname": {"()": AddProcessName},
},
"formatters": {
Expand All @@ -59,7 +66,7 @@ def filter(self, record):
"level": "DEBUG",
"class": "logging.StreamHandler",
"formatter": "mozlog",
"filters": ["add_hostid", "add_processname"],
"filters": ["add_hostname", "add_processname"],
},
},
}
Expand Down
78 changes: 78 additions & 0 deletions socorro/libmarkus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.

"""Holds Markus utility functions and global state."""

import logging
import os

import markus
from markus.filters import AddTagFilter


_IS_MARKUS_SETUP = False

# NOTE(willkg): this checks the CLOUD_PROVIDER environ directly here because this can't
# import socorro.settings; this is temporary--once we finish the GCP migration, we can
# remove this
_CLOUD_PROVIDER = os.environ.get("CLOUD_PROVIDER", "AWS").upper()

LOGGER = logging.getLogger(__name__)
# NOTE(willkg): we need to set the prefix selectively because in AWS we don't have the
# "socorro" key prefix, but in GCP we want one
if _CLOUD_PROVIDER == "GCP":
METRICS = markus.get_metrics("socorro")
else:
METRICS = markus.get_metrics()


def set_up_metrics(statsd_host, statsd_port, hostname, debug=False):
"""Initialize and configures the metrics system.
:arg statsd_host: the statsd host to send metrics to
:arg statsd_port: the port on the host to send metrics to
:arg hostname: the host name
:arg debug: whether or not to additionally log metrics to the logger
"""
global _IS_MARKUS_SETUP, METRICS
if _IS_MARKUS_SETUP:
return

markus_backends = [
{
"class": "markus.backends.datadog.DatadogMetrics",
"options": {
"statsd_host": statsd_host,
"statsd_port": statsd_port,
},
}
]
if debug:
markus_backends.append(
{
"class": "markus.backends.logging.LoggingMetrics",
"options": {
"logger_name": "markus",
"leader": "METRICS",
},
}
)

if _CLOUD_PROVIDER == "GCP" and hostname:
METRICS.filters.append(AddTagFilter(f"host:{hostname}"))

markus.configure(markus_backends)

_IS_MARKUS_SETUP = True


def build_prefix(*parts):
new_prefix = []
for part in parts:
part = part.strip()
if part:
new_prefix.append(part)

return ".".join(parts)
30 changes: 6 additions & 24 deletions socorro/mozilla_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ def _or_none(val):
doc="Whether or not this is a local development environment.",
)

HOST_ID = _config(
"HOST_ID",
HOSTNAME = _config(
"HOSTNAME",
default=socket.gethostname(),
doc="Name of the host this is running on.",
)
Expand All @@ -73,27 +73,9 @@ def _or_none(val):
doc="Default logging level. Should be one of DEBUG, INFO, WARNING, ERROR.",
)

# Markus configuration for metrics
MARKUS_BACKENDS = [
{
"class": "markus.backends.datadog.DatadogMetrics",
"options": {
"statsd_host": _config(
"STATSD_HOST",
default="localhost",
doc="statsd host.",
),
"statsd_port": _config(
"STATSD_PORT",
default="8125",
parser=int,
doc="statsd port.",
),
},
},
]
if LOCAL_DEV_ENV:
MARKUS_BACKENDS.append({"class": "markus.backends.logging.LoggingMetrics"})

STATSD_HOST = _config("STATSD_HOST", default="localhost", doc="statsd host.")
STATSD_PORT = _config("STATSD_PORT", default="8125", parser=int, doc="statsd port.")


# Processor configuration
Expand All @@ -120,7 +102,7 @@ def _or_none(val):
"class": "socorro.processor.pipeline.Pipeline",
"options": {
"rulesets": "socorro.mozilla_rulesets.RULESETS",
"host_id": HOST_ID,
"hostname": HOSTNAME,
},
},
"temporary_path": _config(
Expand Down
47 changes: 29 additions & 18 deletions socorro/processor/cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
from fillmore.libsentry import set_up_sentry
from fillmore.scrubber import Scrubber, SCRUB_RULES_DEFAULT
from inotify_simple import INotify, flags, Event
import markus

from socorro import settings
from socorro.libmarkus import METRICS, set_up_metrics
from socorro.lib.libdockerflow import get_release_name, get_version_info
from socorro.lib.liblogging import set_up_logging

Expand All @@ -46,15 +46,11 @@
# How many seconds between heartbeats
HEARTBEAT_INTERVAL = 60

# Metrics client to use
METRICS = markus.get_metrics("processor.cache_manager")


def count_sentry_scrub_error(msg):
# NOTE(willkg): we re-use the processor prefix here and differentiate with the
# service tag.
metrics = markus.get_metrics("processor")
metrics.incr("sentry_scrub_error", value=1, tags=["service:cachemanager"])
METRICS.incr("processor.sentry_scrub_error", value=1, tags=["service:cachemanager"])


class LastUpdatedOrderedDict(OrderedDict):
Expand Down Expand Up @@ -149,9 +145,14 @@ def set_up(self):
set_up_logging(
local_dev_env=settings.LOCAL_DEV_ENV,
logging_level=settings.CACHE_MANAGER_LOGGING_LEVEL,
host_id=settings.HOST_ID,
hostname=settings.HOSTNAME,
)
set_up_metrics(
statsd_host=settings.STATSD_HOST,
statsd_port=settings.STATSD_PORT,
hostname=settings.HOSTNAME,
debug=settings.LOCAL_DEV_ENV,
)
markus.configure(backends=settings.MARKUS_BACKENDS)

scrubber = Scrubber(
rules=SCRUB_RULES_DEFAULT,
Expand All @@ -160,7 +161,7 @@ def set_up(self):
set_up_sentry(
sentry_dsn=settings.SENTRY_DSN,
release=get_release_name(self.basedir),
host_id=settings.HOST_ID,
host_id=settings.HOSTNAME,
before_send=scrubber,
)

Expand Down Expand Up @@ -273,7 +274,7 @@ def make_room(self, size):
continue

self.logger.debug("evicted %s %s", rm_path, f"{rm_size:,d}")
METRICS.incr("evict")
METRICS.incr("processor.cache_manager.evict")

self.total_size -= removed

Expand Down Expand Up @@ -339,7 +340,7 @@ def _event_generator(self, nonblocking=False):
continue

if flags.Q_OVERFLOW & event_mask:
METRICS.incr("q_overflow")
METRICS.incr("processor.cache_manager.q_overflow")
continue

try:
Expand Down Expand Up @@ -495,23 +496,33 @@ def _event_generator(self, nonblocking=False):
now = time.time()
if now > next_heartbeat:
if is_verbose:
METRICS.gauge("usage", value=self.total_size)
METRICS.gauge(
"processor.cache_manager.usage", value=self.total_size
)

if self.lru:
sorted_sizes = list(sorted(self.lru.values()))
avg = int(sum(sorted_sizes) / len(sorted_sizes))
# Some metrics about file sizes
METRICS.gauge("file_sizes.avg", avg)
METRICS.gauge("file_sizes.median", get_index(sorted_sizes, 50))
METRICS.gauge("processor.cache_manager.file_sizes.avg", avg)
METRICS.gauge(
"processor.cache_manager.file_sizes.median",
get_index(sorted_sizes, 50),
)
METRICS.gauge(
"processor.cache_manager.file_sizes.ninety_five",
get_index(sorted_sizes, 95),
)
METRICS.gauge(
"file_sizes.ninety_five", get_index(sorted_sizes, 95)
"processor.cache_manager.file_sizes.max", sorted_sizes[-1]
)
METRICS.gauge("file_sizes.max", sorted_sizes[-1])

# Some metrics about what's in the cache
METRICS.gauge("files.count", len(sorted_sizes))
METRICS.gauge(
"processor.cache_manager.files.count", len(sorted_sizes)
)
gt_500 = len([fs for fs in sorted_sizes if fs > 500_000_000])
METRICS.gauge("files.gt_500", gt_500)
METRICS.gauge("processor.cache_manager.files.gt_500", gt_500)

next_heartbeat = now + HEARTBEAT_INTERVAL

Expand Down
Loading

0 comments on commit a6c97ca

Please sign in to comment.