Skip to content

Commit

Permalink
[Winlogbeat] Improve query building and error recovery (#42187)
Browse files Browse the repository at this point in the history
* Retry on publisher disabled error

* Split query to never surpass 22 clauses

* Add changelog entry
  • Loading branch information
marc-gr authored Jan 3, 2025
1 parent 580f0f6 commit 279d5cc
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 85 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Format embedded messages in the experimental api {pull}41525[41525]
- Implement exclusion range support for event_id. {issue}38623[38623] {pull}41639[41639]
- Make the experimental API GA and rename it to winlogbeat-raw {issue}39580[39580] {pull}41770[41770]
- Remove 22 clause limitation {issue}35047[35047] {pull}42187[42187]
- Add handling for recoverable publisher disabled errors {issue}35316[35316] {pull}42187[42187]


*Functionbeat*
Expand Down
34 changes: 0 additions & 34 deletions winlogbeat/docs/winlogbeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -241,40 +241,6 @@ winlogbeat.event_logs:
event_id: 4624, 4625, 4700-4800, -4735, -4701-4710
--------------------------------------------------------------------------------

[WARNING]
=======================================
If you specify more than 22 query conditions (event IDs or event ID ranges), some
versions of Windows will prevent {beatname_uc} from reading the event log due to
limits in the query system. If this occurs a similar warning as shown below will
be logged by {beatname_uc}, and it will continue processing data from other event
logs.
`WARN EventLog[Application] Open() error. No events will be read from this
source. The specified query is invalid.`
In some cases, the limit may be lower than 22 conditions. For instance, using a
mixture of ranges and single event IDs, along with an additional parameter such
as `ignore older`, results in a limit of 21 conditions.
If you have more than 22 conditions, you can workaround this Windows limitation
by using a drop_event[drop-event] processor to do the filtering after
{beatname_uc} has received the events from Windows. The filter shown below is
equivalent to `event_id: 903, 1024, 4624` but can be expanded beyond 22
event IDs.
[source,yaml]
--------------------------------------------------------------------------------
winlogbeat.event_logs:
- name: Security
processors:
- drop_event.when.not.or:
- equals.winlog.event_id: 903
- equals.winlog.event_id: 1024
- equals.winlog.event_id: 4624
--------------------------------------------------------------------------------
=======================================

[float]
==== `event_logs.language`

Expand Down
2 changes: 1 addition & 1 deletion winlogbeat/eventlog/errors_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func IsRecoverable(err error) bool {
return err == win.ERROR_INVALID_HANDLE || err == win.RPC_S_SERVER_UNAVAILABLE ||
err == win.RPC_S_CALL_CANCELLED || err == win.ERROR_EVT_QUERY_RESULT_STALE ||
err == win.ERROR_INVALID_PARAMETER
err == win.ERROR_INVALID_PARAMETER || err == win.ERROR_EVT_PUBLISHER_DISABLED
}

// IsChannelNotFound returns true if the error indicates the channel was not found.
Expand Down
131 changes: 91 additions & 40 deletions winlogbeat/sys/wineventlog/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,17 @@ import (
const (
query = `<QueryList>
<Query Id="0">
<Select Path="{{.Path}}">*{{if .Select}}[System[{{join .Select " and "}}]]{{end}}</Select>{{if .Suppress}}
{{- if .Select}}{{range $s := .Select}}
<Select Path="{{$.Path}}">*[System[{{join $s " and "}}]]</Select>{{end}}
{{- else}}
<Select Path="{{.Path}}">*</Select>
{{- end}}
{{- if .Suppress}}
<Suppress Path="{{.Path}}">*[System[{{.Suppress}}]]</Suppress>{{end}}
</Query>
</QueryList>`

queryClauseLimit = 21
)

var (
Expand Down Expand Up @@ -74,49 +81,55 @@ type Query struct {
// Build builds a query from the given parameters. The query is returned as a
// XML string and can be used with Subscribe function.
func (q Query) Build() (string, error) {
var errs multierror.Errors
if q.Log == "" {
errs = append(errs, fmt.Errorf("empty log name"))
}

qp := &queryParams{Path: q.Log}
builders := []func(Query) error{
qp.ignoreOlderSelect,
qp.eventIDSelect,
qp.levelSelect,
qp.providerSelect,
}
for _, build := range builders {
if err := build(q); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return "", errs.Err()
qp, err := newQueryParams(q)
if err != nil {
return "", err
}
return executeTemplate(queryTemplate, qp)
}

// queryParams are the parameters that are used to create a query from a
// template.
type queryParams struct {
ignoreOlder string
level string
provider string
selectEventFilters []string

Path string
Select []string
Select [][]string
Suppress string
}

func (qp *queryParams) ignoreOlderSelect(q Query) error {
if q.IgnoreOlder <= 0 {
return nil
func newQueryParams(q Query) (*queryParams, error) {
var errs multierror.Errors
if q.Log == "" {
errs = append(errs, fmt.Errorf("empty log name"))
}
qp := &queryParams{
Path: q.Log,
}
qp.withIgnoreOlder(q)
qp.withProvider(q)
if err := qp.withEventFilters(q); err != nil {
errs = append(errs, err)
}
if err := qp.withLevel(q); err != nil {
errs = append(errs, err)
}
qp.buildSelects()
return qp, errs.Err()
}

func (qp *queryParams) withIgnoreOlder(q Query) {
if q.IgnoreOlder <= 0 {
return
}
ms := q.IgnoreOlder.Nanoseconds() / int64(time.Millisecond)
qp.Select = append(qp.Select,
fmt.Sprintf("TimeCreated[timediff(@SystemTime) &lt;= %d]", ms))
return nil
qp.ignoreOlder = fmt.Sprintf("TimeCreated[timediff(@SystemTime) &lt;= %d]", ms)
}

func (qp *queryParams) eventIDSelect(q Query) error {
func (qp *queryParams) withEventFilters(q Query) error {
if q.EventID == "" {
return nil
}
Expand Down Expand Up @@ -155,10 +168,26 @@ func (qp *queryParams) eventIDSelect(q Query) error {
}
}

if len(includes) == 1 {
qp.Select = append(qp.Select, includes...)
} else if len(includes) > 1 {
qp.Select = append(qp.Select, "("+strings.Join(includes, " or ")+")")
actualLim := queryClauseLimit - len(q.Provider)
if q.IgnoreOlder > 0 {
actualLim--
}
if q.Level != "" {
actualLim--
}
// we split selects in chunks of at most queryClauseLim size
for i := 0; i < len(includes); i += actualLim {
end := i + actualLim
if end > len(includes) {
end = len(includes)
}
chunk := includes[i:end]

if len(chunk) == 1 {
qp.selectEventFilters = append(qp.selectEventFilters, chunk...)
} else if len(chunk) > 1 {
qp.selectEventFilters = append(qp.selectEventFilters, "("+strings.Join(chunk, " or ")+")")
}
}

if len(excludes) > 0 {
Expand All @@ -168,7 +197,7 @@ func (qp *queryParams) eventIDSelect(q Query) error {
return nil
}

// levelSelect returns a xpath selector for the event Level. The returned
// withLevel returns a xpath selector for the event Level. The returned
// selector will select events with levels less than or equal to the specified
// level. Note that level 0 is used as a catch-all/unknown level.
//
Expand All @@ -179,7 +208,7 @@ func (qp *queryParams) eventIDSelect(q Query) error {
// warning, warn - 3
// error, err - 2
// critical, crit - 1
func (qp *queryParams) levelSelect(q Query) error {
func (qp *queryParams) withLevel(q Query) error {
if q.Level == "" {
return nil
}
Expand Down Expand Up @@ -208,25 +237,47 @@ func (qp *queryParams) levelSelect(q Query) error {
}

if len(levelSelect) > 0 {
qp.Select = append(qp.Select, "("+strings.Join(levelSelect, " or ")+")")
qp.level = "(" + strings.Join(levelSelect, " or ") + ")"
}

return nil
}

func (qp *queryParams) providerSelect(q Query) error {
func (qp *queryParams) withProvider(q Query) {
if len(q.Provider) == 0 {
return nil
return
}

selects := make([]string, 0, len(q.Provider))
for _, p := range q.Provider {
selects = append(selects, fmt.Sprintf("@Name='%s'", p))
}

qp.Select = append(qp.Select,
fmt.Sprintf("Provider[%s]", strings.Join(selects, " or ")))
return nil
qp.provider = fmt.Sprintf("Provider[%s]", strings.Join(selects, " or "))
}

func (qp *queryParams) buildSelects() {
if len(qp.selectEventFilters) == 0 {
sel := appendIfNotEmpty(qp.ignoreOlder, qp.level, qp.provider)
if len(sel) == 0 {
return
}
qp.Select = append(qp.Select, sel)
return
}
for _, f := range qp.selectEventFilters {
qp.Select = append(qp.Select, appendIfNotEmpty(qp.ignoreOlder, f, qp.level, qp.provider))
}
}

func appendIfNotEmpty(ss ...string) []string {
var sel []string
for _, s := range ss {
if s != "" {
sel = append(sel, s)
}
}
return sel
}

// executeTemplate populates a template with the given data and returns the
Expand Down
24 changes: 23 additions & 1 deletion winlogbeat/sys/wineventlog/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestProviderQuery(t *testing.T) {
func TestCombinedQuery(t *testing.T) {
const expected = `<QueryList>
<Query Id="0">
<Select Path="Application">*[System[TimeCreated[timediff(@SystemTime) &lt;= 3600000] and (EventID=1 or (EventID &gt;= 1 and EventID &lt;= 100)) and (Level = 3)]]</Select>
<Select Path="Application">*[System[TimeCreated[timediff(@SystemTime) &lt;= 3600000] and (EventID=1 or (EventID &gt;= 1 and EventID &lt;= 100)) and (Level = 3) and Provider[@Name='Foo' or @Name='Bar' or @Name='Bazz']]]</Select>
<Suppress Path="Application">*[System[(EventID=75 or (EventID &gt;= 97 and EventID &lt;= 99))]]</Suppress>
</Query>
</QueryList>`
Expand All @@ -108,6 +108,28 @@ func TestCombinedQuery(t *testing.T) {
IgnoreOlder: time.Hour,
EventID: "1, 1-100, -75, -97-99",
Level: "Warning",
Provider: []string{"Foo", "Bar", "Bazz"},
}.Build()
if assert.NoError(t, err) {
assert.Equal(t, expected, q)
t.Log(q)
}
}

func TestCombinedQuerySplit(t *testing.T) {
const expected = `<QueryList>
<Query Id="0">
<Select Path="Application">*[System[(EventID=1 or EventID=2 or EventID=3 or EventID=4 or EventID=5 or EventID=6 or EventID=7 or EventID=8 or EventID=9 or EventID=10 or EventID=11 or EventID=12 or EventID=13 or EventID=14 or EventID=15 or EventID=16 or EventID=17 or EventID=18) and (Level = 0 or Level = 4) and Provider[@Name='Microsoft-Windows-User Profiles Service' or @Name='Windows Error Reporting']]]</Select>
<Select Path="Application">*[System[(EventID=19 or (EventID &gt;= 20 and EventID &lt;= 100) or EventID=1001) and (Level = 0 or Level = 4) and Provider[@Name='Microsoft-Windows-User Profiles Service' or @Name='Windows Error Reporting']]]</Select>
<Suppress Path="Application">*[System[(EventID=75 or (EventID &gt;= 97 and EventID &lt;= 99))]]</Suppress>
</Query>
</QueryList>`

q, err := Query{
Log: "Application",
EventID: "1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20-100,-75,-97-99,1001",
Level: "Information",
Provider: []string{"Microsoft-Windows-User Profiles Service", "Windows Error Reporting"},
}.Build()
if assert.NoError(t, err) {
assert.Equal(t, expected, q)
Expand Down
19 changes: 10 additions & 9 deletions winlogbeat/sys/wineventlog/syscall_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@ const NilHandle EvtHandle = 0
// Event log error codes.
// https://msdn.microsoft.com/en-us/library/windows/desktop/ms681382(v=vs.85).aspx
const (
ERROR_INVALID_HANDLE syscall.Errno = 6
ERROR_INVALID_PARAMETER syscall.Errno = 87
ERROR_INSUFFICIENT_BUFFER syscall.Errno = 122
ERROR_NO_MORE_ITEMS syscall.Errno = 259
RPC_S_SERVER_UNAVAILABLE syscall.Errno = 1722
RPC_S_INVALID_BOUND syscall.Errno = 1734
RPC_S_CALL_CANCELLED syscall.Errno = 1818
ERROR_INVALID_OPERATION syscall.Errno = 4317
ERROR_EVT_CHANNEL_NOT_FOUND syscall.Errno = 15007
ERROR_INVALID_HANDLE syscall.Errno = 6
ERROR_INVALID_PARAMETER syscall.Errno = 87
ERROR_INSUFFICIENT_BUFFER syscall.Errno = 122
ERROR_NO_MORE_ITEMS syscall.Errno = 259
RPC_S_SERVER_UNAVAILABLE syscall.Errno = 1722
RPC_S_INVALID_BOUND syscall.Errno = 1734
RPC_S_CALL_CANCELLED syscall.Errno = 1818
ERROR_INVALID_OPERATION syscall.Errno = 4317
ERROR_EVT_CHANNEL_NOT_FOUND syscall.Errno = 15007
ERROR_EVT_PUBLISHER_DISABLED syscall.Errno = 15037
)

// EvtSubscribeFlag defines the possible values that specify when to start subscribing to events.
Expand Down

0 comments on commit 279d5cc

Please sign in to comment.