Skip to content

Commit

Permalink
Release 1.3.1 (#364)
Browse files Browse the repository at this point in the history
* Refactoring sonarqube issues (#355)
* Refactoring various functions to lower complexity
* Set "keepalive timeout", closing the splunk connection every 5 second - with configuration (#354)
* Keepalive timeout config (#361)
* Updated dependencies (#363)
* Version bump (#365)
* Edge processor configuration
* Upgrade to golang 1.20 and upgrade compatible dependencies

---------

Co-authored-by: VihasMakwana <[email protected]>
Co-authored-by: Vihas Splunk <[email protected]>
Co-authored-by: Harry Metske <[email protected]>
  • Loading branch information
4 people authored Mar 29, 2024
1 parent 0d2ff2b commit 767d1f6
Show file tree
Hide file tree
Showing 563 changed files with 18,542 additions and 11,609 deletions.
13 changes: 12 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,17 @@ jobs:
- name: Nozzle Log
run: |
cf logs splunk-firehose-nozzle &
- name: Get tile name
run: |
echo "tile_name=$(ls tile/product | grep ".pivotal")" >> "$GITHUB_ENV"
- name: Upload tile
uses: actions/upload-artifact@v2
with:
name: ${{ env.tile_name }}
path: tile/product/${{ env.tile_name }}


# Skip test for now!
execute_tests:
Expand Down Expand Up @@ -258,4 +269,4 @@ jobs:
echo "Teardown deployment env"
cf delete splunk-firehose-nozzle -f
cf delete data_gen -f
cf delete-org splunk-ci-org -f
cf delete-org splunk-ci-org -f
15 changes: 13 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ This is recommended for dev environments only.
* `STATUS_MONITOR_INTERVAL`: Time interval (in s/m/h. For example, 3600s or 60m or 1h) for Enabling Monitoring (Metric data of insights with in the connectors). Default is 0s (Disabled).
* `SPLUNK_METRIC_INDEX`: Index in which metric data will be ingested when monitoring module is enabled
* `SELECTED_MONITORING_METRICS`: Name of the metrics that you want to monitor and add using comma seprated values. List of the metrics that are supported in the metrics modules are given below
* `REFRESH_SPLUNK_CONNECTION`: If set to true, PCF will periodically refresh connection to Splunk (how often depends on KEEP_ALIVE_TIMER value). If set to false connection will be kept alive and reused. (Default: false)
* `KEEP_ALIVE_TIMER`: Time after which connection to Splunk will be refreshed, if REFRESH_SPLUNK_CONNECTION is set to true (in s/m/h. For example, 3600s or 60m or 1h). (Default: 30s)

__About app cache params:__

Expand Down Expand Up @@ -276,7 +278,7 @@ FORMAT = new_index
<p class="note"><strong>Note:</strong>Moving from version 1.2.4 to 1.2.5, timestamp will use nanosecond precision instead of milliseconds.</p>
__Monitoring(Metric data Ingestion):__
## __Monitoring(Metric data Ingestion):__
| Metric Name | Description
|---|---
Expand All @@ -301,6 +303,15 @@ __Monitoring(Metric data Ingestion):__
<p class="note"><strong>Note:</strong>Select value Rate(Avg) for Aggregation from Analysis tab on the top right.</p>
### Routing data through edge processor via HEC
Logs can be routed to Splunk via Edge Processor. Assuming that you have a working Edge Processor instance, you can use it with minimal
changes to nozzle configuration.
Configuratino fields that you should change are:
* `SPLUNK_HOST`: Use the host of your Edge Processor instance instead of Splunk. Example: https://x.x.x.x:8088.
* `SPLUNK_TOKEN`: It is a required parameter. A token used to authorize your request, can be found in Edge Processor settings. If your
EP token authentication is turned off, you can enter a placeholder values instead (e.x. "-").
## <a id='walkthrough'></a> Troubleshooting
This topic describes how to troubleshoot Splunk Firehose Nozzle for Cloud Foundry.
Expand Down Expand Up @@ -508,7 +519,7 @@ $ chmod +x tools/nozzle.sh
Build project:
```
$ make VERSION=1.3.0
$ make VERSION=1.3.1
```
Run tests with [Ginkgo](http://onsi.github.io/ginkgo/)
Expand Down
43 changes: 24 additions & 19 deletions cache/cache_easyjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package cache

import (
json "encoding/json"

jlexer "github.com/mailru/easyjson/jlexer"
jwriter "github.com/mailru/easyjson/jwriter"
)
Expand Down Expand Up @@ -43,25 +44,7 @@ func easyjsonA591d1bcDecodeGithubComCloudfoundryCommunitySplunkFirehoseNozzleCac
case "OrgGuid":
out.OrgGuid = string(in.String())
case "CfAppEnv":
if in.IsNull() {
in.Skip()
} else {
in.Delim('{')
if !in.IsDelim('}') {
out.CfAppEnv = make(map[string]interface{})
} else {
out.CfAppEnv = nil
}
for !in.IsDelim('}') {
key := string(in.String())
in.WantColon()
var v1 interface{}
v1 = in.Interface()
(out.CfAppEnv)[key] = v1
in.WantComma()
}
in.Delim('}')
}
parseCfAppEnv(in, out)
case "IgnoredApp":
out.IgnoredApp = bool(in.Bool())
default:
Expand All @@ -71,6 +54,28 @@ func easyjsonA591d1bcDecodeGithubComCloudfoundryCommunitySplunkFirehoseNozzleCac
}
in.Delim('}')
}

func parseCfAppEnv(in *jlexer.Lexer, out *App) {
if in.IsNull() {
in.Skip()
} else {
in.Delim('{')
if !in.IsDelim('}') {
out.CfAppEnv = make(map[string]interface{})
} else {
out.CfAppEnv = nil
}
for !in.IsDelim('}') {
key := string(in.String())
in.WantColon()
v1 := in.Interface()
(out.CfAppEnv)[key] = v1
in.WantComma()
}
in.Delim('}')
}
}

func easyjsonA591d1bcEncodeGithubComCloudfoundryCommunitySplunkFirehoseNozzleCache(out *jwriter.Writer, in App) {
out.RawByte('{')
first := true
Expand Down
1 change: 0 additions & 1 deletion eventrouter/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func (r *router) Route(msg *events.Envelope) error {
// Ignore this event since we are not interested
return nil
}

_ = r.sink.Write(msg)

return nil
Expand Down
92 changes: 49 additions & 43 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,57 +195,63 @@ func ContainerMetric(msg *events.Envelope) *Event {
}

func (e *Event) AnnotateWithAppData(appCache cache.Cache, config *Config) {
cf_app_id := e.Fields["cf_app_id"]
appGuid := fmt.Sprintf("%s", cf_app_id)

if cf_app_id != nil && appGuid != "<nil>" && cf_app_id != "" {
appInfo, err := appCache.GetApp(appGuid)
if err != nil {
if err == cache.ErrMissingAndIgnored {
logrus.Info(err.Error(), cf_app_id)
} else {
logrus.Error("Failed to fetch application metadata from remote: ", err)
}
return
} else if appInfo == nil {
return
}
cf_app_name := appInfo.Name
cf_space_id := appInfo.SpaceGuid
cf_space_name := appInfo.SpaceName
cf_org_id := appInfo.OrgGuid
cf_org_name := appInfo.OrgName
cf_ignored_app := appInfo.IgnoredApp
app_env := appInfo.CfAppEnv

if cf_app_name != "" && config.AddAppName {
e.Fields["cf_app_name"] = cf_app_name
}
cfAppId := e.Fields["cf_app_id"]
appGuid := fmt.Sprintf("%s", cfAppId)

if cf_space_id != "" && config.AddSpaceGuid {
e.Fields["cf_space_id"] = cf_space_id
}
if cfAppId == nil || cfAppId == "" || appGuid == "<nil>" {
return
}

if cf_space_name != "" && config.AddSpaceName {
e.Fields["cf_space_name"] = cf_space_name
appInfo, err := appCache.GetApp(appGuid)
if err != nil {
if err == cache.ErrMissingAndIgnored {
logrus.Info(err.Error(), cfAppId)
} else {
logrus.Error("Failed to fetch application metadata from remote: ", err)
}
return
} else if appInfo == nil {
return
}

if cf_org_id != "" && config.AddOrgGuid {
e.Fields["cf_org_id"] = cf_org_id
}
e.parseAndAnnotateWithAppInfo(appInfo, config)
}

if cf_org_name != "" && config.AddOrgName {
e.Fields["cf_org_name"] = cf_org_name
}
func (e *Event) parseAndAnnotateWithAppInfo(appInfo *cache.App, config *Config) {
cfAppName := appInfo.Name
cfSpaceId := appInfo.SpaceGuid
cfSpaceName := appInfo.SpaceName
cfOrgId := appInfo.OrgGuid
cfOrgName := appInfo.OrgName
cfIgnoredApp := appInfo.IgnoredApp
appEnv := appInfo.CfAppEnv

if cfAppName != "" && config.AddAppName {
e.Fields["cf_app_name"] = cfAppName
}

if app_env["SPLUNK_INDEX"] != nil {
e.Fields["info_splunk_index"] = app_env["SPLUNK_INDEX"]
}
if cfSpaceId != "" && config.AddSpaceGuid {
e.Fields["cf_space_id"] = cfSpaceId
}

if cf_ignored_app != false {
e.Fields["cf_ignored_app"] = cf_ignored_app
}
if cfSpaceName != "" && config.AddSpaceName {
e.Fields["cf_space_name"] = cfSpaceName
}

if cfOrgId != "" && config.AddOrgGuid {
e.Fields["cf_org_id"] = cfOrgId
}

if cfOrgName != "" && config.AddOrgName {
e.Fields["cf_org_name"] = cfOrgName
}

if appEnv["SPLUNK_INDEX"] != nil {
e.Fields["info_splunk_index"] = appEnv["SPLUNK_INDEX"]
}

if cfIgnoredApp {
e.Fields["cf_ignored_app"] = cfIgnoredApp
}
}

Expand Down
26 changes: 14 additions & 12 deletions eventsink/splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,20 @@ import (
const SPLUNK_HEC_FIELDS_SUPPORT_VERSION = "6.4"

type SplunkConfig struct {
FlushInterval time.Duration
QueueSize int // consumer queue buffer size
BatchSize int
Retries int // No of retries to post events to HEC before dropping events
Hostname string
SubscriptionID string
ExtraFields map[string]string
TraceLogging bool
UUID string
Logger lager.Logger
StatusMonitorInterval time.Duration
LoggingIndex string
FlushInterval time.Duration
QueueSize int // consumer queue buffer size
BatchSize int
Retries int // No of retries to post events to HEC before dropping events
Hostname string
SubscriptionID string
ExtraFields map[string]string
TraceLogging bool
UUID string
Logger lager.Logger
StatusMonitorInterval time.Duration
LoggingIndex string
RefreshSplunkConnection bool
KeepAliveTimer time.Duration
}

type ParseConfig = fevents.Config
Expand Down
61 changes: 38 additions & 23 deletions eventwriter/splunk_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,25 @@ import (
"fmt"
"io"
"net/http"
"time"

"code.cloudfoundry.org/cfhttp"
"code.cloudfoundry.org/lager"
"github.com/cloudfoundry-community/splunk-firehose-nozzle/utils"
)

var keepAliveTimer = time.Now()

type SplunkConfig struct {
Host string
Token string
Index string
Fields map[string]string
SkipSSL bool
Debug bool
Version string
Host string
Token string
Index string
Fields map[string]string
SkipSSL bool
Debug bool
Version string
RefreshSplunkConnection bool
KeepAliveTimer time.Duration

Logger lager.Logger
}
Expand Down Expand Up @@ -52,18 +57,7 @@ func (s *SplunkEvent) Write(events []map[string]interface{}) (error, uint64) {
bodyBuffer := new(bytes.Buffer)
count := uint64(len(events))
for i, event := range events {

if _, ok := event["index"]; !ok {
if event["event"].(map[string]interface{})["info_splunk_index"] != nil {
event["index"] = event["event"].(map[string]interface{})["info_splunk_index"]
} else if s.config.Index != "" {
event["index"] = s.config.Index
}
}

if len(s.config.Fields) > 0 {
event["fields"] = s.config.Fields
}
s.parseEvent(&event)

eventJson, err := json.Marshal(event)
if err == nil {
Expand All @@ -90,6 +84,22 @@ func (s *SplunkEvent) Write(events []map[string]interface{}) (error, uint64) {
}
}

func (s *SplunkEvent) parseEvent(event *map[string]interface{}) error {
if _, ok := (*event)["index"]; !ok {
if (*event)["event"].(map[string]interface{})["info_splunk_index"] != nil {
(*event)["index"] = (*event)["event"].(map[string]interface{})["info_splunk_index"]
} else if s.config.Index != "" {
(*event)["index"] = s.config.Index
}
}

if len(s.config.Fields) > 0 {
(*event)["fields"] = s.config.Fields
}

return nil
}

func (s *SplunkEvent) send(postBody *[]byte) error {
endpoint := fmt.Sprintf("%s/services/collector", s.config.Host)
req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(*postBody))
Expand All @@ -113,10 +123,15 @@ func (s *SplunkEvent) send(postBody *[]byte) error {
responseBody, _ := io.ReadAll(resp.Body)
return errors.New(fmt.Sprintf("Non-ok response code [%d] from splunk: %s", resp.StatusCode, responseBody))
} else {
//Draining the response buffer, so that the same connection can be reused the next time
_, err := io.Copy(io.Discard, resp.Body)
if err != nil {
s.config.Logger.Error("Error discarding response body", err)
if s.config.RefreshSplunkConnection && time.Now().After(keepAliveTimer) {
if s.config.KeepAliveTimer > 0 {
keepAliveTimer = time.Now().Add(s.config.KeepAliveTimer)
}
} else {
//Draining the response buffer, so that the same connection can be reused the next time
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
s.config.Logger.Error("Error discarding response body", err)
}
}
}
s.BodyBufferSize.Add(uint64(len(*postBody)))
Expand Down
Loading

0 comments on commit 767d1f6

Please sign in to comment.