Skip to content

Commit

Permalink
x-pack/filebeat/input/entityanalytics/provider/okta: avoid work on un…
Browse files Browse the repository at this point in the history
…wanted datasets (elastic#36770)

During full sync the provider may have state from a previous dataset. So
in the case that the user has changed dataset from users to devices or
vice versa the provider may publish already existing state in the entity
graph. This change adds conditional checks to ensure that unwanted
dataset records are not published.
  • Loading branch information
efd6 authored Oct 9, 2023
1 parent ba2a641 commit 1b3786e
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ is collected by it.
- Allow http_endpoint input to receive PUT and PATCH requests. {pull}36734[36734]
- Add cache processor. {pull}36786[36786]
- Avoid unwanted publication of Azure entity records. {pull}36753[36753]
- Avoid unwanted publication of Okta entity records. {pull}36770[36770]

*Auditbeat*

Expand Down
18 changes: 18 additions & 0 deletions x-pack/filebeat/input/entityanalytics/provider/okta/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,21 @@ func (c *conf) Validate() error {
return errors.New("dataset must be 'all', 'users', 'devices' or empty")
}
}

func (c *conf) wantUsers() bool {
switch strings.ToLower(c.Dataset) {
case "", "all", "users":
return true
default:
return false
}
}

func (c *conf) wantDevices() bool {
switch strings.ToLower(c.Dataset) {
case "", "all", "devices":
return true
default:
return false
}
}
25 changes: 13 additions & 12 deletions x-pack/filebeat/input/entityanalytics/provider/okta/okta.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"io"
"net/http"
"net/url"
"strings"
"time"

"github.com/hashicorp/go-retryablehttp"
Expand Down Expand Up @@ -253,16 +252,22 @@ func (p *oktaInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, clien
return err
}

if len(state.users) != 0 || len(state.devices) != 0 {
wantUsers := p.cfg.wantUsers()
wantDevices := p.cfg.wantDevices()
if (len(state.users) != 0 && wantUsers) || (len(state.devices) != 0 && wantDevices) {
tracker := kvstore.NewTxTracker(ctx)

start := time.Now()
p.publishMarker(start, start, inputCtx.ID, true, client, tracker)
for _, u := range state.users {
p.publishUser(u, state, inputCtx.ID, client, tracker)
if wantUsers {
for _, u := range state.users {
p.publishUser(u, state, inputCtx.ID, client, tracker)
}
}
for _, d := range state.devices {
p.publishDevice(d, state, inputCtx.ID, client, tracker)
if wantDevices {
for _, d := range state.devices {
p.publishDevice(d, state, inputCtx.ID, client, tracker)
}
}

end := time.Now()
Expand Down Expand Up @@ -339,9 +344,7 @@ func (p *oktaInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Sto
// any existing deltaLink will be ignored, forcing a full synchronization from Okta.
// Returns a set of modified users by ID.
func (p *oktaInput) doFetchUsers(ctx context.Context, state *stateStore, fullSync bool) ([]*User, error) {
switch strings.ToLower(p.cfg.Dataset) {
case "", "all", "users":
default:
if !p.cfg.wantUsers() {
p.logger.Debugf("Skipping user collection from API: dataset=%s", p.cfg.Dataset)
return nil, nil
}
Expand Down Expand Up @@ -426,9 +429,7 @@ func (p *oktaInput) doFetchUsers(ctx context.Context, state *stateStore, fullSyn
// synchronization from Okta.
// Returns a set of modified devices by ID.
func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullSync bool) ([]*Device, error) {
switch strings.ToLower(p.cfg.Dataset) {
case "", "all", "devices":
default:
if !p.cfg.wantDevices() {
p.logger.Debugf("Skipping device collection from API: dataset=%s", p.cfg.Dataset)
return nil, nil
}
Expand Down

0 comments on commit 1b3786e

Please sign in to comment.