forked from PostHog/posthog
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgunicorn.config.py
254 lines (209 loc) · 8.02 KB
/
gunicorn.config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
import os
import socket
import struct
import sys
import threading
import time
import structlog
from prometheus_client import CollectorRegistry, Gauge, multiprocess, start_http_server
loglevel = "error"
keepalive = 120
# Set the timeout to something lower than any downstreams, such that if the
# timeout is hit, then the worker will be killed and respawned, which will then
# we able to pick up any connections that were previously pending on the socket
# and serve the requests before the downstream timeout.
timeout = 15
grateful_timeout = 120
METRICS_UPDATE_INTERVAL_SECONDS = int(os.getenv("GUNICORN_METRICS_UPDATE_SECONDS", 5))
def when_ready(server):
"""
To ease being able to hide the /metrics endpoint when running in production,
we serve the metrics on a separate port, using the
prometheus_client.multiprocess Collector to pull in data from the worker
processes.
"""
registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)
port = int(os.environ.get("PROMETHEUS_METRICS_EXPORT_PORT", 8001))
start_http_server(port=port, registry=registry)
# Start a thread in the Arbiter that will monitor the backlog on the sockets
# Gunicorn is listening on.
socket_monitor = SocketMonitor(server=server, registry=registry)
socket_monitor.start()
def post_fork(server, worker):
"""
Within each worker process, start a thread that will monitor the thread and
connection pool.
"""
worker_monitor = WorkerMonitor(worker=worker)
worker_monitor.start()
def worker_exit(server, worker):
"""
Ensure that we mark workers as dead with the prometheus_client such that
any cleanup can happen.
"""
multiprocess.mark_process_dead(worker.pid)
class SocketMonitor(threading.Thread):
"""
We have enabled the statsd collector for Gunicorn, but this doesn't include
the backlog due to concerns over portability, see
https://github.com/benoitc/gunicorn/pull/2407
Instead, we expose to Prometheus a gauge that will report the backlog size.
We can then:
1. use this to monitor how well the Gunicorn instances are keeping up with
requests.
2. use this metric to handle HPA scaling e.g. in Kubernetes
"""
def __init__(self, server, registry):
super().__init__()
self.daemon = True
self.server = server
self.registry = registry
def run(self):
"""
Every X seconds, check to see how many connections are pending for each
server socket.
We label each individually, as limits such as `--backlog` will apply to
each individually.
"""
if sys.platform != "linux":
# We use the assumption that we are on Linux to be able to get the
# socket backlog, so if we're not on Linux, we return immediately.
return
backlog_gauge = Gauge(
"gunicorn_pending_connections",
"The number of pending connections on all sockets. Linux only.",
registry=self.registry,
labelnames=["listener"],
)
while True:
for sock in self.server.LISTENERS:
backlog = self.get_backlog(sock=sock)
backlog_gauge.labels(listener=str(sock)).set(backlog)
time.sleep(METRICS_UPDATE_INTERVAL_SECONDS)
def get_backlog(self, sock):
# tcp_info struct from include/uapi/linux/tcp.h
fmt = "B" * 8 + "I" * 24
tcp_info_struct = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_INFO, 104)
# 12 is tcpi_unacked
return struct.unpack(fmt, tcp_info_struct)[12]
class WorkerMonitor(threading.Thread):
"""
There is a statsd logger support in Gunicorn that allows us to gather
metrics e.g. on the number of workers, requests, request duration etc. See
https://docs.gunicorn.org/en/stable/instrumentation.html for details.
To get a better understanding of the pool utilization, number of accepted
connections, we start a thread in head worker to report these via prometheus
metrics.
"""
def __init__(self, worker):
super().__init__()
self.daemon = True
self.worker = worker
def run(self):
"""
Every X seconds, check the status of the Thread pool, as well as the
"""
active_worker_connections = Gauge(
"gunicorn_active_worker_connections",
"Number of active connections.",
labelnames=["pid"],
)
max_worker_connections = Gauge(
"gunicorn_max_worker_connections",
"Maximum worker connections.",
labelnames=["pid"],
)
total_threads = Gauge(
"gunicorn_max_worker_threads",
"Size of the thread pool per worker.",
labelnames=["pid"],
)
active_threads = Gauge(
"gunicorn_active_worker_threads",
"Number of threads actively processing requests.",
labelnames=["pid"],
)
pending_requests = Gauge(
"gunicorn_pending_requests",
"Number of requests that have been read from a connection but have not completed yet",
labelnames=["pid"],
)
max_worker_connections.labels(pid=self.worker.pid).set(self.worker.cfg.worker_connections)
total_threads.labels(pid=self.worker.pid).set(self.worker.cfg.threads)
while True:
active_worker_connections.labels(pid=self.worker.pid).set(self.worker.nr_conns)
active_threads.labels(pid=self.worker.pid).set(min(self.worker.cfg.threads, len(self.worker.futures)))
pending_requests.labels(pid=self.worker.pid).set(len(self.worker.futures))
time.sleep(METRICS_UPDATE_INTERVAL_SECONDS)
LOGGING_FORMATTER_NAME = os.getenv("LOGGING_FORMATTER_NAME", "default")
# Setup stdlib logging to be handled by Structlog
def add_pid_and_tid(
logger: logging.Logger, method_name: str, event_dict: structlog.types.EventDict
) -> structlog.types.EventDict:
event_dict["pid"] = os.getpid()
event_dict["tid"] = threading.get_ident()
return event_dict
pre_chain = [
# Add the log level and a timestamp to the event_dict if the log entry
# is not from structlog.
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
add_pid_and_tid,
structlog.processors.TimeStamper(fmt="iso"),
]
# This is a copy the default logging config for gunicorn but with additions to:
#
# 1. non propagate loggers to the root handlers (otherwise we get duplicate log
# lines)
# 2. use structlog for processing of log records
#
# See
# https://github.com/benoitc/gunicorn/blob/0b953b803786997d633d66c0f7c7b290df75e07c/gunicorn/glogging.py#L48
# for the default log settings.
logconfig_dict = {
"version": 1,
"disable_existing_loggers": True,
"formatters": {
"default": {
"()": structlog.stdlib.ProcessorFormatter,
"processor": structlog.dev.ConsoleRenderer(colors=True),
"foreign_pre_chain": pre_chain,
},
"json": {
"()": structlog.stdlib.ProcessorFormatter,
"processor": structlog.processors.JSONRenderer(),
"foreign_pre_chain": pre_chain,
},
},
"root": {"level": "INFO", "handlers": ["console"]},
"loggers": {
"gunicorn.error": {
"level": "INFO",
"handlers": ["error_console"],
"propagate": False,
"qualname": "gunicorn.error",
},
"gunicorn.access": {
"level": "INFO",
"handlers": ["console"],
"propagate": False,
"qualname": "gunicorn.access",
},
},
"handlers": {
"error_console": {
"class": "logging.StreamHandler",
"formatter": LOGGING_FORMATTER_NAME,
"stream": "ext://sys.stderr",
},
"console": {
"class": "logging.StreamHandler",
"formatter": LOGGING_FORMATTER_NAME,
"stream": "ext://sys.stdout",
},
},
}