Skip to content

Commit

Permalink
Merge pull request #2 from SEKOIA-IO/fix/out_of_order_events
Browse files Browse the repository at this point in the history
fix: Better handling of out of order events
  • Loading branch information
Darkheir authored Jan 16, 2023
2 parents 7236291 + 4c51e17 commit 0245ecf
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ linters:

linters-settings:
goimports:
local-prefixes: github.com/elastic/go-libaudit
local-prefixes: github.com/SEKOIA-IO/go-libaudit
gofumpt:
extra-rules: true
goheader:
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Package documentation can be found on [GoDoc][godocs].
Installation can be done with a normal `go get`:

```
$ go get github.com/elastic/go-libaudit
$ go get github.com/SEKOIA-IO/go-libaudit
```

### audit example
Expand All @@ -29,7 +29,7 @@ and outputs the data it receives to stdout. The system's `auditd` process
should be stopped first.

```
$ go install github.com/elastic/go-libaudit/cmd/audit@main
$ go install github.com/SEKOIA-IO/go-libaudit/cmd/audit@main
$ sudo $GOPATH/bin/audit -d
```

Expand All @@ -40,7 +40,7 @@ process or the output of the _audit_ example command. It combines related log
messages that are a part of the same event.

```
$ go install github.com/elastic/go-libaudit/cmd/auparse@main
$ go install github.com/SEKOIA-IO/go-libaudit/cmd/auparse@main
$ sudo cat /var/log/audit/audit.log | auparse
---
type=CRED_ACQ msg=audit(1481077334.302:545): pid=1444 uid=0 auid=1000 ses=4 subj=unconfined_u:unconfined_r:unconfined_t:s0-s0:c0.c1023 msg='op=PAM:setcred grantors=pam_env,pam_unix acct="root" exe="/usr/bin/sudo" hostname=? addr=? terminal=/dev/pts/1 res=success'
Expand Down
135 changes: 116 additions & 19 deletions reassembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package libaudit

import (
"errors"
"math"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -154,10 +155,13 @@ func (p sequenceNumSlice) Len() int { return len(p) }
func (p sequenceNumSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p sequenceNumSlice) Sort() { sort.Sort(p) }

func sequenceHasRollover(i, j sequenceNum) bool {
return abs(int64(i)-int64(j)) > maxSortRange
}

func (p sequenceNumSlice) Less(i, j int) bool {
// Handle sequence number rollover.
diff := abs(int64(p[i]) - int64(p[j]))
if diff > maxSortRange {
if sequenceHasRollover(p[i], p[j]) {
return p[i] > p[j]
}

Expand Down Expand Up @@ -198,19 +202,21 @@ func (e *event) IsExpired() bool {

type eventList struct {
sync.Mutex
seqs sequenceNumSlice
events map[sequenceNum]*event
lastSeq sequenceNum
maxSize int
timeout time.Duration
seqs sequenceNumSlice
events map[sequenceNum]*event
lastSeq sequenceNum
maxSize int
timeout time.Duration
missingSequences map[sequenceNum]time.Time
}

func newEventList(maxSize int, timeout time.Duration) *eventList {
return &eventList{
seqs: make([]sequenceNum, 0, maxSize+1),
events: make(map[sequenceNum]*event, maxSize+1),
maxSize: maxSize,
timeout: timeout,
seqs: make([]sequenceNum, 0, maxSize+1),
events: make(map[sequenceNum]*event, maxSize+1),
maxSize: maxSize,
timeout: timeout,
missingSequences: make(map[sequenceNum]time.Time, maxSize+1),
}
}

Expand Down Expand Up @@ -242,14 +248,15 @@ func (l *eventList) Clear() ([]*event, int) {
seq = l.seqs[0]
event := l.events[seq]

if l.lastSeq > 0 {
lost += int(seq) - int(l.lastSeq) - 1
}
l.lastSeq = seq
lost += l.processSequence(seq)
evicted = append(evicted, event)
l.remove()
}

// All remaining missing sequences are marked as lost
lost += len(l.missingSequences)
l.missingSequences = make(map[sequenceNum]time.Time, l.maxSize+1)

return evicted, lost
}

Expand Down Expand Up @@ -301,10 +308,7 @@ func (l *eventList) CleanUp() ([]*event, int) {
event := l.events[seq]

if event.complete || size > l.maxSize || event.IsExpired() {
if l.lastSeq > 0 {
lost += int(seq) - int(l.lastSeq) - 1
}
l.lastSeq = seq
lost += l.processSequence(seq)
evicted = append(evicted, event)
l.remove()
continue
Expand All @@ -313,5 +317,98 @@ func (l *eventList) CleanUp() ([]*event, int) {
break
}

lost += l.cleanExpiredMissingSequences()

return evicted, lost
}

// trackMissingSequences marks all the sequences between the start sequence and the end sequence
// as missing, start and end excluded
func (l *eventList) trackMissingSequences(start, end sequenceNum) int {
missingSequences := end - start - 1
if missingSequences == 0 {
return 0 // No missing sequences
}
var lost int
if missingSequences > sequenceNum(l.maxSize) {
// The gap is bigger than the maximum size allowed.
// We keep only the last ones and discard the others
newStart := end - sequenceNum(l.maxSize) - 1 // -1 to offset the + 1 in the for loop below
lost += int(newStart - start)
start = newStart
}
// If we have too many missing events we remove the old ones
nextMapSize := int(missingSequences + sequenceNum(len(l.missingSequences)))
if nextMapSize > l.maxSize {
lost += l.removeOldestMissingSequences(nextMapSize - l.maxSize)
}
// Add new sequences to the missing ones
l.addMissingSequencesToMap(start, end)
return lost
}

// addMissingSequencesToMap effectively adds the missing sequence to the tracking map
func (l *eventList) addMissingSequencesToMap(start, end sequenceNum) {
if start > end {
// We lost events during the sequence id rollover
// so we fill the gap until the end and start again from 0
maxSeq := sequenceNum(math.MaxUint32)
seq := start + 1
for {
l.missingSequences[seq] = time.Now().Add(l.timeout)
if seq == maxSeq {
break
}
seq++
}
start = maxSeq // maxSeq + 1 == 0
}
for seq := start + 1; seq < end; seq++ {
l.missingSequences[seq] = time.Now().Add(l.timeout)
}
}

// cleanExpiredMissingSequences remove from tracking expired missing sequences
// and returns their number
func (l *eventList) cleanExpiredMissingSequences() int {
lost := 0
now := time.Now()
for seq, expireTime := range l.missingSequences {
if now.After(expireTime) {
delete(l.missingSequences, seq)
lost += 1
}
}
return lost
}

// removeOldestMissingSequences removes the oldest n sequences from the map
func (l *eventList) removeOldestMissingSequences(n int) int {
sequences := make(sequenceNumSlice, 0, l.maxSize+1)
for seq := range l.missingSequences {
sequences = append(sequences, seq)
}
sequences.Sort()
if n > len(sequences) {
n = len(sequences) // avoid having errors while slicing sequences
}
for _, seq := range sequences[:n] {
delete(l.missingSequences, seq)
}
return n
}

func (l *eventList) processSequence(seq sequenceNum) int {
delete(l.missingSequences, seq) // In case the sequence was marked as missing
if seq <= l.lastSeq && !sequenceHasRollover(l.lastSeq, seq) {
// We already handled a sequence after this one,
// so no need to add missing sequences nor update the last sequence
return 0
}
var lost int
if l.lastSeq > 0 {
lost = l.trackMissingSequences(l.lastSeq, seq)
}
l.lastSeq = seq
return lost
}
64 changes: 64 additions & 0 deletions reassembler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,67 @@ func TestSequenceNumSliceSort(t *testing.T) {

assert.Equal(t, expected, seqs)
}

func TestUnorderedSequencesNoLostEvents(t *testing.T) {
list := newEventList(5, 1*time.Second)
list.Put(&auparse.AuditMessage{Sequence: uint32(2), RecordType: auparse.AUDIT_PROCTITLE})
list.CleanUp()
list.Put(&auparse.AuditMessage{Sequence: uint32(1), RecordType: auparse.AUDIT_PROCTITLE})
list.CleanUp()
list.Put(&auparse.AuditMessage{Sequence: uint32(3), RecordType: auparse.AUDIT_PROCTITLE})
_, lost := list.CleanUp()
assert.Equal(t, 0, lost)
_, lost = list.Clear()
assert.Equal(t, 0, lost)

list = newEventList(5, 1*time.Second)
list.Put(&auparse.AuditMessage{Sequence: uint32(1), RecordType: auparse.AUDIT_PROCTITLE})
list.CleanUp()
list.Put(&auparse.AuditMessage{Sequence: uint32(3), RecordType: auparse.AUDIT_PROCTITLE})
_, lost = list.CleanUp()
assert.Equal(t, 0, lost) // Event 2 is not lost yet
list.Put(&auparse.AuditMessage{Sequence: uint32(2), RecordType: auparse.AUDIT_PROCTITLE})
_, lost = list.CleanUp()
assert.Equal(t, 0, lost)
_, lost = list.Clear()
assert.Equal(t, 0, lost)
}

func TestLostEventsOnClear(t *testing.T) {
list := newEventList(5, 1*time.Second)
var lost int

list.Put(&auparse.AuditMessage{Sequence: uint32(1), RecordType: auparse.AUDIT_PROCTITLE})
list.CleanUp()
list.Put(&auparse.AuditMessage{Sequence: uint32(3), RecordType: auparse.AUDIT_PROCTITLE})
_, lost = list.CleanUp()
assert.Equal(t, 0, lost) // Event is not lost yet
_, lost = list.Clear()
assert.Equal(t, 1, lost) // Now it is really lost
}

func TestLostEventsOnClearRollover(t *testing.T) {
list := newEventList(5, 1*time.Second)
var lost int

list.Put(&auparse.AuditMessage{Sequence: uint32(4294967293), RecordType: auparse.AUDIT_PROCTITLE})
list.Put(&auparse.AuditMessage{Sequence: uint32(3), RecordType: auparse.AUDIT_PROCTITLE})
_, lost = list.Clear()
assert.Equal(t, 5, lost)
}

func TestLostEventsOnTimeout(t *testing.T) {
list := newEventList(5, 1*time.Millisecond)
var lost int

list.Put(&auparse.AuditMessage{Sequence: uint32(1), RecordType: auparse.AUDIT_PROCTITLE})
list.Put(&auparse.AuditMessage{Sequence: uint32(3), RecordType: auparse.AUDIT_PROCTITLE})
_, lost = list.CleanUp()
assert.Equal(t, 0, lost) // Event is not lost yet
list.Put(&auparse.AuditMessage{Sequence: uint32(100), RecordType: auparse.AUDIT_PROCTITLE})
_, lost = list.CleanUp()
assert.Equal(t, 92, lost) // The maxSize is set to 5 so we get all the other events
time.Sleep(2 * time.Millisecond)
_, lost = list.CleanUp()
assert.Equal(t, 5, lost)
}

0 comments on commit 0245ecf

Please sign in to comment.