Skip to content

Commit

Permalink
Merge branch 'main' into add-degrading-config
Browse files Browse the repository at this point in the history
  • Loading branch information
VihasMakwana authored Jan 17, 2025
2 parents 77de6cd + ef3bd69 commit 3fab802
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 13 deletions.
12 changes: 6 additions & 6 deletions .buildkite/packaging.pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ steps:
# this prevents parallel builds and possibility of publishing out of order DRA artifacts if the first job takes longer than the second

- name: Start of concurrency group for DRA Snapshot
if: build.branch =~ /^[0-9]+\.[0-9x]+\$/ || build.branch == 'main' || build.env('RUN_SNAPSHOT') == "true"
if: (build.branch =~ /^[0-9]+\.[0-9x]+\$/ || build.branch == 'main' || build.env('RUN_SNAPSHOT') == "true") && build.env('VERSION_QUALIFIER') == null
command: echo "--> Start of concurrency gate dra-snapshot"
concurrency_group: "dra-gate-snapshot-$BUILDKITE_BRANCH"
concurrency: 1
Expand All @@ -38,7 +38,7 @@ steps:
key: dashboards
steps:
- label: Snapshot dashboards
if: build.branch =~ /^[0-9]+\.[0-9x]+\$/ || build.branch == 'main' || build.env('RUN_SNAPSHOT') == "true"
if: (build.branch =~ /^[0-9]+\.[0-9x]+\$/ || build.branch == 'main' || build.env('RUN_SNAPSHOT') == "true") && build.env('VERSION_QUALIFIER') == null
depends_on: start-gate-snapshot
key: dashboards-snapshot
# TODO: container with go and make
Expand Down Expand Up @@ -83,7 +83,7 @@ steps:
- build/distributions/**/*

- group: Packaging snapshot
if: build.branch =~ /^[0-9]+\.[0-9x]+\$/ || build.branch == 'main' || build.env('RUN_SNAPSHOT') == "true"
if: (build.branch =~ /^[0-9]+\.[0-9x]+\$/ || build.branch == 'main' || build.env('RUN_SNAPSHOT') == "true") && build.env('VERSION_QUALIFIER') == null
key: packaging-snapshot
depends_on: start-gate-snapshot
steps:
Expand Down Expand Up @@ -261,7 +261,7 @@ steps:
steps:
- label: DRA Snapshot
## Only for release branches and main
if: build.branch =~ /^[0-9]+\.[0-9x]+\$/ || build.branch == 'main' || build.env('RUN_SNAPSHOT') == "true"
if: (build.branch =~ /^[0-9]+\.[0-9x]+\$/ || build.branch == 'main' || build.env('RUN_SNAPSHOT') == "true") && build.env('VERSION_QUALIFIER') == null
key: dra-snapshot
env:
DRA_WORKFLOW: snapshot
Expand Down Expand Up @@ -300,13 +300,13 @@ steps:
- wait

- command: echo "End of concurrency gate dra-snapshot <--"
if: build.branch =~ /^[0-9]+\.[0-9x]+\$/ || build.branch == 'main' || build.env('RUN_SNAPSHOT') == "true"
if: (build.branch =~ /^[0-9]+\.[0-9x]+\$/ || build.branch == 'main' || build.env('RUN_SNAPSHOT') == "true") && build.env('VERSION_QUALIFIER') == null
concurrency_group: "dra-gate-snapshot-$BUILDKITE_BRANCH"
concurrency: 1
key: end-gate-snapshot

- command: echo "End of concurrency gate dra-staging <--"
if: build.branch =~ /^[0-9]+\.[0-9x]+\$/
if: build.branch =~ /^[0-9]+\.[0-9x]+\$/ || build.env('VERSION_QUALIFIER') != null
concurrency_group: "dra-gate-staging-$BUILDKITE_BRANCH"
concurrency: 1
key: end-gate-staging
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- The `_id` generation process for S3 events has been updated to incorporate the LastModified field. This enhancement ensures that the `_id` is unique. {pull}42078[42078]
- Fix Netflow Template Sharing configuration handling. {pull}42080[42080]
- Updated websocket retry error code list to allow more scenarios to be retried which could have been missed previously. {pull}42218[42218]
- In the `streaming` input, prevent panics on shutdown with a null check and apply a consistent namespace to contextual data in debug logs. {pull}42315[42315]

*Heartbeat*

Expand Down
5 changes: 3 additions & 2 deletions x-pack/filebeat/input/streaming/crowdstrike.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (s *falconHoseStream) followSession(ctx context.Context, cli *http.Client,
if err != nil {
return state, Warning{fmt.Errorf("failed to decode discover body: %w", err)}
}
s.log.Debugw("stream discover metadata", "meta", mapstr.M(body.Meta))
s.log.Debugw("stream discover metadata", logp.Namespace(s.ns), "meta", mapstr.M(body.Meta))

var offset int
if cursor, ok := state["cursor"].(map[string]any); ok {
Expand Down Expand Up @@ -233,6 +233,7 @@ func (s *falconHoseStream) followSession(ctx context.Context, cli *http.Client,
err := dec.Decode(&msg)
if err != nil {
s.metrics.errorsTotal.Inc()
//nolint:errorlint // will not be a wrapped error here.
if err == io.EOF {
s.log.Info("stream ended, restarting")
return state, nil
Expand All @@ -241,7 +242,7 @@ func (s *falconHoseStream) followSession(ctx context.Context, cli *http.Client,
}
s.metrics.receivedBytesTotal.Add(uint64(len(msg)))
state["response"] = []byte(msg)
s.log.Debugw("received firehose message", logp.Namespace("falcon_hose"), "msg", debugMsg(msg))
s.log.Debugw("received firehose message", logp.Namespace(s.ns), "msg", debugMsg(msg))
err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
if err != nil {
s.log.Errorw("failed to process and publish data", "error", err)
Expand Down
12 changes: 7 additions & 5 deletions x-pack/filebeat/input/streaming/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,11 @@ func (s *websocketStream) FollowStream(ctx context.Context) error {

// ensures this is the last connection closed when the function returns
defer func() {
if err := c.Close(); err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("encountered an error while closing the websocket connection", "error", err)
if c != nil {
if err := c.Close(); err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("encountered an error while closing the websocket connection", "error", err)
}
}
}()

Expand Down Expand Up @@ -217,7 +219,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error {
}
s.metrics.receivedBytesTotal.Add(uint64(len(message)))
state["response"] = message
s.log.Debugw("received websocket message", logp.Namespace("websocket"), "msg", string(message))
s.log.Debugw("received websocket message", logp.Namespace(s.ns), "msg", string(message))
err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
if err != nil {
s.metrics.errorsTotal.Inc()
Expand Down Expand Up @@ -294,7 +296,7 @@ func handleConnectionResponse(resp *http.Response, metrics *inputMetrics, log *l
buf.WriteString("... truncated")
}

log.Debugw("websocket connection response", "body", &buf)
log.Debugw("websocket connection response", "http.response.body.content", &buf)
}
}

Expand Down

0 comments on commit 3fab802

Please sign in to comment.