From 4c51e17d0657cc24f678eb7a750eb9bacc2619b0 Mon Sep 17 00:00:00 2001 From: Raphael Cohen Date: Wed, 11 Jan 2023 17:04:41 +0100 Subject: [PATCH] fix: Better handling of out of order events --- .golangci.yml | 2 +- README.md | 6 +- reassembler.go | 135 +++++++++++++++++++++++++++++++++++++------- reassembler_test.go | 64 +++++++++++++++++++++ 4 files changed, 184 insertions(+), 23 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index c5fc9d4..74205d0 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -33,7 +33,7 @@ linters: linters-settings: goimports: - local-prefixes: github.com/elastic/go-libaudit + local-prefixes: github.com/SEKOIA-IO/go-libaudit gofumpt: extra-rules: true goheader: diff --git a/README.md b/README.md index 28ced7b..6062a2c 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Package documentation can be found on [GoDoc][godocs]. Installation can be done with a normal `go get`: ``` -$ go get github.com/elastic/go-libaudit +$ go get github.com/SEKOIA-IO/go-libaudit ``` ### audit example @@ -29,7 +29,7 @@ and outputs the data it receives to stdout. The system's `auditd` process should be stopped first. ``` -$ go install github.com/elastic/go-libaudit/cmd/audit@main +$ go install github.com/SEKOIA-IO/go-libaudit/cmd/audit@main $ sudo $GOPATH/bin/audit -d ``` @@ -40,7 +40,7 @@ process or the output of the _audit_ example command. It combines related log messages that are a part of the same event. ``` -$ go install github.com/elastic/go-libaudit/cmd/auparse@main +$ go install github.com/SEKOIA-IO/go-libaudit/cmd/auparse@main $ sudo cat /var/log/audit/audit.log | auparse --- type=CRED_ACQ msg=audit(1481077334.302:545): pid=1444 uid=0 auid=1000 ses=4 subj=unconfined_u:unconfined_r:unconfined_t:s0-s0:c0.c1023 msg='op=PAM:setcred grantors=pam_env,pam_unix acct="root" exe="/usr/bin/sudo" hostname=? addr=? terminal=/dev/pts/1 res=success' diff --git a/reassembler.go b/reassembler.go index 17cc411..2052481 100644 --- a/reassembler.go +++ b/reassembler.go @@ -19,6 +19,7 @@ package libaudit import ( "errors" + "math" "sort" "sync" "sync/atomic" @@ -154,10 +155,13 @@ func (p sequenceNumSlice) Len() int { return len(p) } func (p sequenceNumSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } func (p sequenceNumSlice) Sort() { sort.Sort(p) } +func sequenceHasRollover(i, j sequenceNum) bool { + return abs(int64(i)-int64(j)) > maxSortRange +} + func (p sequenceNumSlice) Less(i, j int) bool { // Handle sequence number rollover. - diff := abs(int64(p[i]) - int64(p[j])) - if diff > maxSortRange { + if sequenceHasRollover(p[i], p[j]) { return p[i] > p[j] } @@ -198,19 +202,21 @@ func (e *event) IsExpired() bool { type eventList struct { sync.Mutex - seqs sequenceNumSlice - events map[sequenceNum]*event - lastSeq sequenceNum - maxSize int - timeout time.Duration + seqs sequenceNumSlice + events map[sequenceNum]*event + lastSeq sequenceNum + maxSize int + timeout time.Duration + missingSequences map[sequenceNum]time.Time } func newEventList(maxSize int, timeout time.Duration) *eventList { return &eventList{ - seqs: make([]sequenceNum, 0, maxSize+1), - events: make(map[sequenceNum]*event, maxSize+1), - maxSize: maxSize, - timeout: timeout, + seqs: make([]sequenceNum, 0, maxSize+1), + events: make(map[sequenceNum]*event, maxSize+1), + maxSize: maxSize, + timeout: timeout, + missingSequences: make(map[sequenceNum]time.Time, maxSize+1), } } @@ -242,14 +248,15 @@ func (l *eventList) Clear() ([]*event, int) { seq = l.seqs[0] event := l.events[seq] - if l.lastSeq > 0 { - lost += int(seq) - int(l.lastSeq) - 1 - } - l.lastSeq = seq + lost += l.processSequence(seq) evicted = append(evicted, event) l.remove() } + // All remaining missing sequences are marked as lost + lost += len(l.missingSequences) + l.missingSequences = make(map[sequenceNum]time.Time, l.maxSize+1) + return evicted, lost } @@ -301,10 +308,7 @@ func (l *eventList) CleanUp() ([]*event, int) { event := l.events[seq] if event.complete || size > l.maxSize || event.IsExpired() { - if l.lastSeq > 0 { - lost += int(seq) - int(l.lastSeq) - 1 - } - l.lastSeq = seq + lost += l.processSequence(seq) evicted = append(evicted, event) l.remove() continue @@ -313,5 +317,98 @@ func (l *eventList) CleanUp() ([]*event, int) { break } + lost += l.cleanExpiredMissingSequences() + return evicted, lost } + +// trackMissingSequences marks all the sequences between the start sequence and the end sequence +// as missing, start and end excluded +func (l *eventList) trackMissingSequences(start, end sequenceNum) int { + missingSequences := end - start - 1 + if missingSequences == 0 { + return 0 // No missing sequences + } + var lost int + if missingSequences > sequenceNum(l.maxSize) { + // The gap is bigger than the maximum size allowed. + // We keep only the last ones and discard the others + newStart := end - sequenceNum(l.maxSize) - 1 // -1 to offset the + 1 in the for loop below + lost += int(newStart - start) + start = newStart + } + // If we have too many missing events we remove the old ones + nextMapSize := int(missingSequences + sequenceNum(len(l.missingSequences))) + if nextMapSize > l.maxSize { + lost += l.removeOldestMissingSequences(nextMapSize - l.maxSize) + } + // Add new sequences to the missing ones + l.addMissingSequencesToMap(start, end) + return lost +} + +// addMissingSequencesToMap effectively adds the missing sequence to the tracking map +func (l *eventList) addMissingSequencesToMap(start, end sequenceNum) { + if start > end { + // We lost events during the sequence id rollover + // so we fill the gap until the end and start again from 0 + maxSeq := sequenceNum(math.MaxUint32) + seq := start + 1 + for { + l.missingSequences[seq] = time.Now().Add(l.timeout) + if seq == maxSeq { + break + } + seq++ + } + start = maxSeq // maxSeq + 1 == 0 + } + for seq := start + 1; seq < end; seq++ { + l.missingSequences[seq] = time.Now().Add(l.timeout) + } +} + +// cleanExpiredMissingSequences remove from tracking expired missing sequences +// and returns their number +func (l *eventList) cleanExpiredMissingSequences() int { + lost := 0 + now := time.Now() + for seq, expireTime := range l.missingSequences { + if now.After(expireTime) { + delete(l.missingSequences, seq) + lost += 1 + } + } + return lost +} + +// removeOldestMissingSequences removes the oldest n sequences from the map +func (l *eventList) removeOldestMissingSequences(n int) int { + sequences := make(sequenceNumSlice, 0, l.maxSize+1) + for seq := range l.missingSequences { + sequences = append(sequences, seq) + } + sequences.Sort() + if n > len(sequences) { + n = len(sequences) // avoid having errors while slicing sequences + } + for _, seq := range sequences[:n] { + delete(l.missingSequences, seq) + } + return n +} + +func (l *eventList) processSequence(seq sequenceNum) int { + delete(l.missingSequences, seq) // In case the sequence was marked as missing + if seq <= l.lastSeq && !sequenceHasRollover(l.lastSeq, seq) { + // We already handled a sequence after this one, + // so no need to add missing sequences nor update the last sequence + return 0 + } + var lost int + if l.lastSeq > 0 { + lost = l.trackMissingSequences(l.lastSeq, seq) + } + l.lastSeq = seq + return lost +} diff --git a/reassembler_test.go b/reassembler_test.go index 2ff6048..341e33f 100644 --- a/reassembler_test.go +++ b/reassembler_test.go @@ -156,3 +156,67 @@ func TestSequenceNumSliceSort(t *testing.T) { assert.Equal(t, expected, seqs) } + +func TestUnorderedSequencesNoLostEvents(t *testing.T) { + list := newEventList(5, 1*time.Second) + list.Put(&auparse.AuditMessage{Sequence: uint32(2), RecordType: auparse.AUDIT_PROCTITLE}) + list.CleanUp() + list.Put(&auparse.AuditMessage{Sequence: uint32(1), RecordType: auparse.AUDIT_PROCTITLE}) + list.CleanUp() + list.Put(&auparse.AuditMessage{Sequence: uint32(3), RecordType: auparse.AUDIT_PROCTITLE}) + _, lost := list.CleanUp() + assert.Equal(t, 0, lost) + _, lost = list.Clear() + assert.Equal(t, 0, lost) + + list = newEventList(5, 1*time.Second) + list.Put(&auparse.AuditMessage{Sequence: uint32(1), RecordType: auparse.AUDIT_PROCTITLE}) + list.CleanUp() + list.Put(&auparse.AuditMessage{Sequence: uint32(3), RecordType: auparse.AUDIT_PROCTITLE}) + _, lost = list.CleanUp() + assert.Equal(t, 0, lost) // Event 2 is not lost yet + list.Put(&auparse.AuditMessage{Sequence: uint32(2), RecordType: auparse.AUDIT_PROCTITLE}) + _, lost = list.CleanUp() + assert.Equal(t, 0, lost) + _, lost = list.Clear() + assert.Equal(t, 0, lost) +} + +func TestLostEventsOnClear(t *testing.T) { + list := newEventList(5, 1*time.Second) + var lost int + + list.Put(&auparse.AuditMessage{Sequence: uint32(1), RecordType: auparse.AUDIT_PROCTITLE}) + list.CleanUp() + list.Put(&auparse.AuditMessage{Sequence: uint32(3), RecordType: auparse.AUDIT_PROCTITLE}) + _, lost = list.CleanUp() + assert.Equal(t, 0, lost) // Event is not lost yet + _, lost = list.Clear() + assert.Equal(t, 1, lost) // Now it is really lost +} + +func TestLostEventsOnClearRollover(t *testing.T) { + list := newEventList(5, 1*time.Second) + var lost int + + list.Put(&auparse.AuditMessage{Sequence: uint32(4294967293), RecordType: auparse.AUDIT_PROCTITLE}) + list.Put(&auparse.AuditMessage{Sequence: uint32(3), RecordType: auparse.AUDIT_PROCTITLE}) + _, lost = list.Clear() + assert.Equal(t, 5, lost) +} + +func TestLostEventsOnTimeout(t *testing.T) { + list := newEventList(5, 1*time.Millisecond) + var lost int + + list.Put(&auparse.AuditMessage{Sequence: uint32(1), RecordType: auparse.AUDIT_PROCTITLE}) + list.Put(&auparse.AuditMessage{Sequence: uint32(3), RecordType: auparse.AUDIT_PROCTITLE}) + _, lost = list.CleanUp() + assert.Equal(t, 0, lost) // Event is not lost yet + list.Put(&auparse.AuditMessage{Sequence: uint32(100), RecordType: auparse.AUDIT_PROCTITLE}) + _, lost = list.CleanUp() + assert.Equal(t, 92, lost) // The maxSize is set to 5 so we get all the other events + time.Sleep(2 * time.Millisecond) + _, lost = list.CleanUp() + assert.Equal(t, 5, lost) +}