forked from honeycombio/honeytail
-
Notifications
You must be signed in to change notification settings - Fork 0
/
postgresql.go
334 lines (294 loc) · 11.1 KB
/
postgresql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
// Package postgresql contains code for parsing PostgreSQL slow query logs.
//
// The Postgres slow query format
// ------------------------------
//
// In general, Postgres logs consist of a prefix, a level, and a message:
//
// 2017-11-06 19:20:32 UTC [11534-2] LOG: autovacuum launcher shutting down
// |<-------------prefix----------->|level|<----------message-------------->|
//
// 2017-11-07 01:43:39 UTC [3542-7] postgres@test LOG: duration: 15.577 ms statement: SELECT * FROM test;
// |<-------------prefix------------------------>|level|<-------------------message---------------------->|
//
// The format of the configuration prefix is configurable as `log_line_prefix` in postgresql.conf
// using the following format specifiers:
//
// %a = application name
// %u = user name
// %d = database name
// %r = remote host and port
// %h = remote host
// %p = process ID
// %t = timestamp without milliseconds
// %m = timestamp with milliseconds
// %i = command tag
// %e = SQL state
// %c = session ID
// %l = session line number
// %s = session start timestamp
// %v = virtual transaction ID
// %x = transaction ID (0 if none)
// %q = stop here in non-session
// processes
// %% = '%'
//
// For example, the prefix format for the lines above is:
// %t [%p-%l] %q%u@%d
// We currently require users to pass the prefix format as a parser option.
//
// Slow query logs specifically have the following format:
// 2017-11-07 01:43:39 UTC [3542-7] postgres@test LOG: duration: 15.577 ms statement: SELECT * FROM test;
// |<-------------prefix------------------------>|<----------header-------------------->|<-----query----->|
//
// For convenience, we call everything after the prefix but before the actual query string the "header".
//
// The query may span multiple lines; continuations are indented. For example:
//
// 2017-11-07 01:43:39 UTC [3542-7] postgres@test LOG: duration: 15.577 ms statement: SELECT * FROM test
// WHERE id=1;
package postgresql
import (
"fmt"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/honeycombio/honeytail/event"
"github.com/honeycombio/honeytail/parsers"
"github.com/honeycombio/mysqltools/query/normalizer"
"github.com/sirupsen/logrus"
)
const (
// Regex string that matches timestamps in log
timestampRe = `\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}[.0-9]* [A-Z]+`
defaultPrefix = "%t [%p-%l] %u@%d"
// Regex string that matches the header of a slow query log line
slowQueryHeader = `\s*(?P<level>[A-Z0-9]+):\s+duration: (?P<duration>[0-9\.]+) ms\s+(?:(statement)|(execute \S+)): `
traceFromTrace1 = `\/\*.*trace_id=['"](?P<trace_id>[^'"]+)['"].*parent_id=['"](?P<parent_id>[^'"]+)['"].*\*\/`
traceFromTrace2 = `\/\*.*parent_id=['"](?P<parent_id>[^'"]+)['"].*trace_id=['"](?P<trace_id>[^'"]+)['"].*\*\/`
traceFromTraceParent = `\/\*.*traceparent=['"]\d{2}-(?P<trace_id>[^-]+)-(?P<parent_id>[^-]+)-\d{2}['"].*\*\/`
)
var slowQueryHeaderRegex = &parsers.ExtRegexp{Regexp: regexp.MustCompile(slowQueryHeader)}
var traceFromTraceRegex1 = &parsers.ExtRegexp{Regexp: regexp.MustCompile(traceFromTrace1)}
var traceFromTraceRegex2 = &parsers.ExtRegexp{Regexp: regexp.MustCompile(traceFromTrace2)}
var traceFromTraceParentRegex = &parsers.ExtRegexp{Regexp: regexp.MustCompile(traceFromTraceParent)}
// prefixField represents a specific format specifier in the log_line_prefix string
// (see module comment for details).
type prefixField struct {
Name string
Pattern string
}
func (p *prefixField) ReString() string {
return fmt.Sprintf("(?P<%s>%s)", p.Name, p.Pattern)
}
var prefixValues = map[string]prefixField{
"%a": {Name: "application", Pattern: "\\S+"},
"%u": {Name: "user", Pattern: "\\S+"},
"%d": {Name: "database", Pattern: "\\S+"},
"%r": {Name: "host_port", Pattern: "\\S+"},
"%h": {Name: "host", Pattern: "\\S+"},
"%p": {Name: "pid", Pattern: "\\d+"},
"%t": {Name: "timestamp", Pattern: timestampRe},
"%m": {Name: "timestamp_millis", Pattern: timestampRe},
"%n": {Name: "timestamp_unix", Pattern: "\\d+"},
"%i": {Name: "command_tag", Pattern: "\\S+"},
"%e": {Name: "sql_state", Pattern: "\\S+"},
"%c": {Name: "session_id", Pattern: "\\d+"},
"%l": {Name: "session_line_number", Pattern: "\\d+"},
"%s": {Name: "session_start", Pattern: timestampRe},
"%v": {Name: "virtual_transaction_id", Pattern: "\\S+"},
"%x": {Name: "transaction_id", Pattern: "\\S+"},
}
type Options struct {
LogLinePrefix string `long:"log_line_prefix" description:"Format string for PostgreSQL log line prefix"`
}
type Parser struct {
// regex to match the log_line_prefix format specified by the user
pgPrefixRegex *parsers.ExtRegexp
}
func (p *Parser) Init(options interface{}) (err error) {
var logLinePrefixFormat string
conf, ok := options.(*Options)
if !ok || conf.LogLinePrefix == "" {
logLinePrefixFormat = defaultPrefix
} else {
logLinePrefixFormat = conf.LogLinePrefix
}
p.pgPrefixRegex, err = buildPrefixRegexp(logLinePrefixFormat)
return err
}
func (p *Parser) ProcessLines(lines <-chan string, send chan<- event.Event, prefixRegex *parsers.ExtRegexp) {
rawEvents := make(chan []string)
wg := &sync.WaitGroup{}
wg.Add(1)
go p.handleEvents(rawEvents, send, wg)
var groupedLines []string
for line := range lines {
if prefixRegex != nil {
// This is the "global" prefix regex as specified by the
// --log_prefix option, for stripping prefixes added by syslog or
// the like. It's unlikely that it'll actually be set by consumers
// of database logs. Don't confuse this with p.pgPrefixRegex, which
// is a compiled regex for parsing the postgres-specific line
// prefix.
var prefix = prefixRegex.FindString(line)
line = strings.TrimPrefix(line, prefix)
}
if !isContinuationLine(line) && len(groupedLines) > 0 {
// If the line we just parsed is the start of a new log statement,
// send off the previously accumulated group.
rawEvents <- groupedLines
groupedLines = make([]string, 0, 1)
}
groupedLines = append(groupedLines, line)
}
rawEvents <- groupedLines
close(rawEvents)
wg.Wait()
}
// handleEvents receives sets of grouped log lines, each representing a single
// log statement. It attempts to parse them, and sends the events it constructs
// down the send channel.
func (p *Parser) handleEvents(rawEvents <-chan []string, send chan<- event.Event, wg *sync.WaitGroup) {
defer wg.Done()
// TODO: spin up a group of goroutines to do this
for rawEvent := range rawEvents {
ev := p.handleEvent(rawEvent)
if ev != nil {
send <- *ev
}
}
}
// handleEvent takes a single grouped log statement (an array of lines) and attempts to parse it.
// It returns a pointer to an Event if successful, and nil if not.
func (p *Parser) handleEvent(rawEvent []string) *event.Event {
normalizer := normalizer.Parser{}
if len(rawEvent) == 0 {
return nil
}
firstLine := rawEvent[0]
// First, try to parse the prefix
match, suffix, generalMeta := parsePrefix(p.pgPrefixRegex, firstLine)
if !match {
// Note: this may be noisy when debug logging is turned on, since the
// postgres general log contains lots of other statements as well.
logrus.WithField("line", firstLine).Debug("Log line prefix didn't match expected format")
return nil
}
ev := &event.Event{
Data: make(map[string]interface{}, 0),
}
addFieldsToEvent(generalMeta, ev)
// Now, parse the slow query header
match, query, slowQueryMeta := parsePrefix(slowQueryHeaderRegex, suffix)
if !match {
logrus.WithField("line", firstLine).Debug("didn't find slow query header, skipping line")
return nil
}
if rawDuration, ok := slowQueryMeta["duration"]; ok {
duration, _ := strconv.ParseFloat(rawDuration, 64)
ev.Data["duration"] = duration
} else {
logrus.WithField("query", query).Debug("Failed to find query duration in log line")
}
// Finally, concatenate the remaining text to form the query, and attempt to
// normalize it.
for _, line := range rawEvent[1:] {
query += " " + strings.TrimLeft(line, " \t")
}
// Also try to parse the trace data from a SQL comment
match, traceData := parseTraceData(query)
if match {
if traceId, ok := traceData["trace_id"]; ok {
ev.Data["trace.trace_id"] = traceId
}
if parentId, ok := traceData["parent_id"]; ok {
ev.Data["trace.parent_id"] = parentId
}
}
normalizedQuery := normalizer.NormalizeQuery(query)
ev.Data["query"] = query
ev.Data["normalized_query"] = normalizedQuery
if len(normalizer.LastTables) > 0 {
ev.Data["tables"] = strings.Join(normalizer.LastTables, " ")
}
if len(normalizer.LastComments) > 0 {
ev.Data["comments"] = "/* " + strings.Join(normalizer.LastComments, " */ /* ") + " */"
}
return ev
}
func parseTraceData(query string) (matched bool, fields map[string]string) {
match, _, traceData := parsePrefix(traceFromTraceRegex1, query)
if match {
return match, traceData
}
match, _, traceData = parsePrefix(traceFromTraceRegex2, query)
if match {
return match, traceData
}
match, _, traceData = parsePrefix(traceFromTraceParentRegex, query)
return match, traceData
}
func isContinuationLine(line string) bool {
return strings.HasPrefix(line, "\t")
}
// addFieldsToEvent takes a map of key-value metadata extracted from a log
// line, and adds them to the given event. It'll convert values to integer
// types where possible, and try to populate the event's timestamp.
func addFieldsToEvent(fields map[string]string, ev *event.Event) {
for k, v := range fields {
// Try to convert values to integer types where sensible, and extract
// timestamp for event
switch k {
case "session_id", "pid", "session_line_number":
if typed, err := strconv.Atoi(v); err == nil {
ev.Data[k] = typed
} else {
ev.Data[k] = v
}
case "timestamp", "timestamp_millis":
if timestamp, err := time.Parse("2006-01-02 15:04:05.999 MST", v); err == nil {
ev.Timestamp = timestamp
} else {
logrus.WithField("timestamp", v).WithError(err).Debug("Error parsing query timestamp")
}
case "timestamp_unix":
if typed, err := strconv.Atoi(v); err == nil {
// Convert millisecond-resolution Unix timestamp to time.Time
// object
timestamp := time.Unix(int64(typed/1000), int64((1000*1000)*(typed%1000))).UTC()
ev.Timestamp = timestamp
} else {
logrus.WithField("timestamp", v).WithError(err).Debug("Error parsing query timestamp")
}
default:
ev.Data[k] = v
}
}
}
func parsePrefix(re *parsers.ExtRegexp, line string) (matched bool, suffix string, fields map[string]string) {
prefix, fields := re.FindStringSubmatchMap(line)
if prefix == "" {
return false, "", nil
}
return true, line[len(prefix):], fields
}
func buildPrefixRegexp(prefixFormat string) (*parsers.ExtRegexp, error) {
prefixFormat = strings.Replace(prefixFormat, "%%", "%", -1)
// The %q format specifier means "if this log line isn't part of a session,
// stop here." The slow query logs that we care about always come from
// sessions, so ignore this.
prefixFormat = strings.Replace(prefixFormat, "%q", "", -1)
prefixFormat = regexp.QuoteMeta(prefixFormat)
for k, v := range prefixValues {
prefixFormat = strings.Replace(prefixFormat, k, v.ReString(), -1)
}
prefixFormat = "^" + prefixFormat
re, err := regexp.Compile(prefixFormat)
if err != nil {
return nil, err
}
return &parsers.ExtRegexp{Regexp: re}, nil
}