Skip to content

Commit

Permalink
[TT-10520] Migrating from go-redis to storage library (TykTechnologie…
Browse files Browse the repository at this point in the history
…s#765)

* migrating from go-redis to storage library

* fixing issues

* linting

* fixing compilation error

* linting

* linting part 2

* adding constructor unit test

* Updating storage to v1.1.0

* Fixing GetAndDeleteSet method

* Modifying errors to 'fatal' type

* Running go mod tidy

* improving tests

* improving tests

* linting

* removing fatal error

* linting

* fixing test

* adding TLS Options

* linting

* simplifying code

* Improving code readingness

* Making model.Connector global variable a singleton

* Update dependencies

* removing commented code

* linting
  • Loading branch information
mativm02 authored Jan 26, 2024
1 parent bf9e7e7 commit 407c373
Show file tree
Hide file tree
Showing 16 changed files with 842 additions and 574 deletions.
2 changes: 0 additions & 2 deletions analytics/aggregate_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package analytics

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -326,7 +325,6 @@ func TestAggregateGraphData_Dimension(t *testing.T) {
r.Len(aggregated, 1)
aggre := aggregated["test-api"]
dimensions := aggre.Dimensions()
fmt.Println(dimensions)
for d, values := range responsesCheck {
for _, v := range values {
found := false
Expand Down
8 changes: 4 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ type TykPumpConfiguration struct {
// Sets the analytics storage type. Where the pump will be fetching data from. Currently, only
// the `redis` option is supported.
AnalyticsStorageType string `json:"analytics_storage_type"`
// Example Redis storage configuration:
// Example Temporal storage configuration:
// ```{.json}
// "analytics_storage_config": {
// "type": "redis",
Expand All @@ -188,11 +188,11 @@ type TykPumpConfiguration struct {
// "optimisation_max_idle": 100,
// "optimisation_max_active": 0,
// "enable_cluster": false,
// "redis_use_ssl": false,
// "redis_ssl_insecure_skip_verify": false
// "use_ssl": false,
// "ssl_insecure_skip_verify": false
// },
// ```
AnalyticsStorageConfig storage.RedisStorageConfig `json:"analytics_storage_config"`
AnalyticsStorageConfig storage.TemporalStorageConfig `json:"analytics_storage_config"`
// Connection string for StatsD monitoring for information please see the
// [Instrumentation docs](https://tyk.io/docs/basic-config-and-security/report-monitor-trigger-events/instrumentation/).
StatsdConnectionString string `json:"statsd_connection_string"`
Expand Down
6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ require (
github.com/DataDog/datadog-go v4.7.0+incompatible
github.com/TykTechnologies/gorpc v0.0.0-20210624160652-fe65bda0ccb9
github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632
github.com/TykTechnologies/storage v1.0.8
github.com/TykTechnologies/storage v1.1.1
github.com/aws/aws-sdk-go-v2 v1.22.1
github.com/aws/aws-sdk-go-v2/config v1.9.0
github.com/aws/aws-sdk-go-v2/credentials v1.5.0
github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0
github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0
github.com/cenkalti/backoff/v4 v4.0.2
github.com/fatih/structs v1.1.0
github.com/go-redis/redis/v8 v8.3.1
github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0
github.com/gofrs/uuid v4.0.0+incompatible
github.com/golang/protobuf v1.5.3
Expand Down Expand Up @@ -100,13 +99,13 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/olivere/elastic v6.2.31+incompatible // indirect
github.com/onsi/ginkgo v1.16.4 // indirect
github.com/onsi/gomega v1.20.0 // indirect
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/redis/go-redis/v9 v9.3.1 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/segmentio/backo-go v0.0.0-20160424052352-204274ad699c // indirect
github.com/shirou/gopsutil v3.20.11+incompatible // indirect
Expand All @@ -120,7 +119,6 @@ require (
github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.mongodb.org/mongo-driver v1.11.2 // indirect
go.opentelemetry.io/otel v0.13.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/sync v0.2.0 // indirect
Expand Down
44 changes: 15 additions & 29 deletions go.sum

Large diffs are not rendered by default.

111 changes: 77 additions & 34 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

var SystemConfig TykPumpConfiguration
var AnalyticsStore storage.AnalyticsStorage
var UptimeStorage storage.AnalyticsStorage
var Pumps []pumps.Pump
var UptimePump pumps.UptimePump
var AnalyticsSerializers []serializer.AnalyticsSerializer
var (
SystemConfig TykPumpConfiguration
AnalyticsStore storage.AnalyticsStorage
UptimeStorage storage.AnalyticsStorage
Pumps []pumps.Pump
UptimePump pumps.UptimePump
AnalyticsSerializers []serializer.AnalyticsSerializer
)

var log = logger.GetLogger()

Expand Down Expand Up @@ -66,7 +68,7 @@ func Init() {
demoMode = &envDemo
}

//Serializer init
// Serializer init
AnalyticsSerializers = []serializer.AnalyticsSerializer{serializer.NewAnalyticsSerializer(serializer.MSGP_SERIALIZER), serializer.NewAnalyticsSerializer(serializer.PROTOBUF_SERIALIZER)}

log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -96,36 +98,69 @@ func Init() {
if *debugMode {
log.Level = logrus.DebugLevel
}

}

func setupAnalyticsStore() {
switch SystemConfig.AnalyticsStorageType {
case "redis":
AnalyticsStore = &storage.RedisClusterStorageManager{}
UptimeStorage = &storage.RedisClusterStorageManager{}
default:
AnalyticsStore = &storage.RedisClusterStorageManager{}
UptimeStorage = &storage.RedisClusterStorageManager{}
}
case "redis", "":
var err error
AnalyticsStore, err = storage.NewTemporalStorageHandler(SystemConfig.AnalyticsStorageConfig, false)
if err != nil {
log.WithFields(logrus.Fields{
"prefix": mainPrefix,
}).Fatal("Error connecting to Temporal Storage: ", err)
}
err = AnalyticsStore.Init()
if err != nil {
log.WithFields(logrus.Fields{
"prefix": mainPrefix,
}).Fatal("Error connecting to Temporal Storage: ", err)
}

// Copy across the redis configuration
uptimeConf := SystemConfig.AnalyticsStorageConfig
// Swap key prefixes for uptime purger
uptimeConf.KeyPrefix = "host-checker:"

AnalyticsStore.Init(SystemConfig.AnalyticsStorageConfig)
UptimeStorage, err = storage.NewTemporalStorageHandler(uptimeConf, false)
if err != nil {
log.WithFields(logrus.Fields{
"prefix": mainPrefix,
}).Fatal("Error connecting to Temporal Storage: ", err)
}

// Copy across the redis configuration
uptimeConf := SystemConfig.AnalyticsStorageConfig
err = UptimeStorage.Init()
if err != nil {
log.WithFields(logrus.Fields{
"prefix": mainPrefix,
}).Fatal("Error connecting to Redis: ", err)
}

// Swap key prefixes for uptime purger
uptimeConf.RedisKeyPrefix = "host-checker:"
UptimeStorage.Init(uptimeConf)
default:
log.WithFields(logrus.Fields{
"prefix": mainPrefix,
}).Fatal("Invalid analytics storage type: ", SystemConfig.AnalyticsStorageType)
}
}

func storeVersion() {
var versionStore = &storage.RedisClusterStorageManager{}
versionConf := SystemConfig.AnalyticsStorageConfig
versionStore.KeyPrefix = "version-check-"
versionStore.Config = versionConf
versionStore.Connect()
err := versionStore.SetKey("pump", pumps.Version, 0)
versionConf := &SystemConfig.AnalyticsStorageConfig
versionConf.KeyPrefix = "version-check-"
versionStore, err := storage.NewTemporalStorageHandler(versionConf, false)
if err != nil {
log.WithFields(logrus.Fields{
"prefix": mainPrefix,
}).Fatal("Error connecting to Temporal Storage: ", err)
}

err = versionStore.Init()
if err != nil {
log.WithFields(logrus.Fields{
"prefix": mainPrefix,
}).Fatal("Error connecting to Temporal Storage: ", err)
}

err = versionStore.SetKey("pump", pumps.Version, 0)
if err != nil {
log.WithFields(logrus.Fields{
"prefix": mainPrefix,
Expand Down Expand Up @@ -177,7 +212,6 @@ func initialisePumps() {
if !SystemConfig.DontPurgeUptimeData {
initialiseUptimePump()
}

}

func initialiseUptimePump() {
Expand Down Expand Up @@ -210,15 +244,20 @@ func StartPurgeLoop(wg *sync.WaitGroup, ctx context.Context, secInterval int, ch
for i := -1; i < 10; i++ {
var analyticsKeyName string
if i == -1 {
//if it's the first iteration, we look for tyk-system-analytics to maintain backwards compatibility or if analytics_config.enable_multiple_analytics_keys is disabled in the gateway
// if it's the first iteration, we look for tyk-system-analytics to maintain backwards compatibility or if analytics_config.enable_multiple_analytics_keys is disabled in the gateway
analyticsKeyName = storage.ANALYTICS_KEYNAME
} else {
analyticsKeyName = fmt.Sprintf("%v_%v", storage.ANALYTICS_KEYNAME, i)
}

for _, serializerMethod := range AnalyticsSerializers {
analyticsKeyName += serializerMethod.GetSuffix()
AnalyticsValues := AnalyticsStore.GetAndDeleteSet(analyticsKeyName, chunkSize, expire)
AnalyticsValues, err := AnalyticsStore.GetAndDeleteSet(analyticsKeyName, chunkSize, expire)
if err != nil {
log.WithFields(logrus.Fields{
"prefix": mainPrefix,
}).Error("Error on Purge Loop. Is Temporal Storage down?: " + err.Error())
}
if len(AnalyticsValues) > 0 {
PreprocessAnalyticsValues(AnalyticsValues, serializerMethod, analyticsKeyName, omitDetails, job, startTime, secInterval)
}
Expand All @@ -229,7 +268,12 @@ func StartPurgeLoop(wg *sync.WaitGroup, ctx context.Context, secInterval int, ch
job.Timing("purge_time_all", time.Since(startTime).Nanoseconds())

if !SystemConfig.DontPurgeUptimeData {
UptimeValues := UptimeStorage.GetAndDeleteSet(storage.UptimeAnalytics_KEYNAME, chunkSize, expire)
UptimeValues, err := UptimeStorage.GetAndDeleteSet(storage.UptimeAnalytics_KEYNAME, chunkSize, expire)
if err != nil {
log.WithFields(logrus.Fields{
"prefix": mainPrefix,
}).Error("Error on Purge Loop. Is Temporal Storage down?: " + err.Error())
}
UptimePump.WriteUptimeData(UptimeValues)
}

Expand Down Expand Up @@ -305,7 +349,6 @@ func writeToPumps(keys []interface{}, job *health.Job, startTime time.Time, purg
}

func filterData(pump pumps.Pump, keys []interface{}) []interface{} {

shouldTrim := SystemConfig.MaxRecordSize != 0 || pump.GetMaxRecordSize() != 0
filters := pump.GetFilters()
ignoreFields := pump.GetIgnoreFields()
Expand Down Expand Up @@ -381,11 +424,11 @@ func execPumpWriting(wg *sync.WaitGroup, pmp pumps.Pump, keys *[]interface{}, pu
}).Debug("Writing to: ", pmp.GetName())

ch := make(chan error, 1)
//Load pump timeout
// Load pump timeout
timeout := pmp.GetTimeout()
var ctx context.Context
var cancel context.CancelFunc
//Initialize context depending if the pump has a configured timeout
// Initialize context depending if the pump has a configured timeout
if timeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
} else {
Expand Down
3 changes: 2 additions & 1 deletion pumps/splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
"strings"

"github.com/TykTechnologies/tyk-pump/analytics"
retry "github.com/TykTechnologies/tyk-pump/http-retry"
"github.com/TykTechnologies/tyk-pump/retry"

"github.com/mitchellh/mapstructure"
)

Expand Down
4 changes: 0 additions & 4 deletions pumps/splunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package pumps
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -54,7 +53,6 @@ func (h *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Body.Close()

if h.returnErrors >= h.reqCount {
fmt.Println("returning err.......")
w.WriteHeader(http.StatusInternalServerError)
_, err := w.Write([]byte("splunk internal error"))
if err != nil {
Expand Down Expand Up @@ -249,8 +247,6 @@ func Test_SplunkWriteDataBatch(t *testing.T) {
keys[1] = analytics.AnalyticsRecord{OrgID: "1", APIID: "123", Path: "/test-path", Method: "POST", TimeStamp: time.Now()}
keys[2] = analytics.AnalyticsRecord{OrgID: "1", APIID: "123", Path: "/test-path", Method: "POST", TimeStamp: time.Now()}

fmt.Println(maxContentLength)

pmp := SplunkPump{}

cfg := make(map[string]interface{})
Expand Down
2 changes: 1 addition & 1 deletion pumps/version.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package pumps

var (
Version = "v1.8.0"
Version = "v1.9.0"
BuiltBy string
Commit string
BuildDate string
Expand Down
2 changes: 1 addition & 1 deletion http-retry/http-retry.go → retry/http-retry.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package httpretry
package retry

import (
"bytes"
Expand Down
16 changes: 16 additions & 0 deletions retry/storage-retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package retry

import (
"time"

"github.com/cenkalti/backoff/v4"
)

func GetTemporalStorageExponentialBackoff() *backoff.ExponentialBackOff {
exponentialBackoff := backoff.NewExponentialBackOff()
exponentialBackoff.Multiplier = 2
exponentialBackoff.MaxInterval = 10 * time.Second
exponentialBackoff.MaxElapsedTime = 0

return exponentialBackoff
}
15 changes: 0 additions & 15 deletions storage/init.go

This file was deleted.

Loading

0 comments on commit 407c373

Please sign in to comment.