From 407c3739e7cb84126de4f67fe58d8ca78bdc54ee Mon Sep 17 00:00:00 2001 From: Matias Date: Fri, 26 Jan 2024 08:37:11 -0300 Subject: [PATCH] [TT-10520] Migrating from go-redis to storage library (#765) * migrating from go-redis to storage library * fixing issues * linting * fixing compilation error * linting * linting part 2 * adding constructor unit test * Updating storage to v1.1.0 * Fixing GetAndDeleteSet method * Modifying errors to 'fatal' type * Running go mod tidy * improving tests * improving tests * linting * removing fatal error * linting * fixing test * adding TLS Options * linting * simplifying code * Improving code readingness * Making model.Connector global variable a singleton * Update dependencies * removing commented code * linting --- analytics/aggregate_test.go | 2 - config.go | 8 +- go.mod | 6 +- go.sum | 44 ++-- main.go | 111 ++++++--- pumps/splunk.go | 3 +- pumps/splunk_test.go | 4 - pumps/version.go | 2 +- {http-retry => retry}/http-retry.go | 2 +- retry/storage-retry.go | 16 ++ storage/init.go | 15 -- storage/redis.go | 332 --------------------------- storage/redis_test.go | 142 ------------ storage/store.go | 93 +++++++- storage/temporal_storage.go | 340 ++++++++++++++++++++++++++++ storage/temporal_storage_test.go | 296 ++++++++++++++++++++++++ 16 files changed, 842 insertions(+), 574 deletions(-) rename {http-retry => retry}/http-retry.go (99%) create mode 100644 retry/storage-retry.go delete mode 100644 storage/init.go delete mode 100644 storage/redis.go delete mode 100644 storage/redis_test.go create mode 100644 storage/temporal_storage.go create mode 100644 storage/temporal_storage_test.go diff --git a/analytics/aggregate_test.go b/analytics/aggregate_test.go index 24240b2b6..c9401d957 100644 --- a/analytics/aggregate_test.go +++ b/analytics/aggregate_test.go @@ -1,7 +1,6 @@ package analytics import ( - "fmt" "testing" "time" @@ -326,7 +325,6 @@ func TestAggregateGraphData_Dimension(t *testing.T) { r.Len(aggregated, 1) aggre := aggregated["test-api"] dimensions := aggre.Dimensions() - fmt.Println(dimensions) for d, values := range responsesCheck { for _, v := range values { found := false diff --git a/config.go b/config.go index bdc3c3fdd..762c7a4b4 100644 --- a/config.go +++ b/config.go @@ -175,7 +175,7 @@ type TykPumpConfiguration struct { // Sets the analytics storage type. Where the pump will be fetching data from. Currently, only // the `redis` option is supported. AnalyticsStorageType string `json:"analytics_storage_type"` - // Example Redis storage configuration: + // Example Temporal storage configuration: // ```{.json} // "analytics_storage_config": { // "type": "redis", @@ -188,11 +188,11 @@ type TykPumpConfiguration struct { // "optimisation_max_idle": 100, // "optimisation_max_active": 0, // "enable_cluster": false, - // "redis_use_ssl": false, - // "redis_ssl_insecure_skip_verify": false + // "use_ssl": false, + // "ssl_insecure_skip_verify": false // }, // ``` - AnalyticsStorageConfig storage.RedisStorageConfig `json:"analytics_storage_config"` + AnalyticsStorageConfig storage.TemporalStorageConfig `json:"analytics_storage_config"` // Connection string for StatsD monitoring for information please see the // [Instrumentation docs](https://tyk.io/docs/basic-config-and-security/report-monitor-trigger-events/instrumentation/). StatsdConnectionString string `json:"statsd_connection_string"` diff --git a/go.mod b/go.mod index c388c718e..d960621e5 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/DataDog/datadog-go v4.7.0+incompatible github.com/TykTechnologies/gorpc v0.0.0-20210624160652-fe65bda0ccb9 github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632 - github.com/TykTechnologies/storage v1.0.8 + github.com/TykTechnologies/storage v1.1.1 github.com/aws/aws-sdk-go-v2 v1.22.1 github.com/aws/aws-sdk-go-v2/config v1.9.0 github.com/aws/aws-sdk-go-v2/credentials v1.5.0 @@ -14,7 +14,6 @@ require ( github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0 github.com/cenkalti/backoff/v4 v4.0.2 github.com/fatih/structs v1.1.0 - github.com/go-redis/redis/v8 v8.3.1 github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0 github.com/gofrs/uuid v4.0.0+incompatible github.com/golang/protobuf v1.5.3 @@ -100,13 +99,13 @@ require ( github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect github.com/olivere/elastic v6.2.31+incompatible // indirect - github.com/onsi/ginkgo v1.16.4 // indirect github.com/onsi/gomega v1.20.0 // indirect github.com/pierrec/lz4 v2.6.0+incompatible // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect + github.com/redis/go-redis/v9 v9.3.1 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/segmentio/backo-go v0.0.0-20160424052352-204274ad699c // indirect github.com/shirou/gopsutil v3.20.11+incompatible // indirect @@ -120,7 +119,6 @@ require ( github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c // indirect github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect go.mongodb.org/mongo-driver v1.11.2 // indirect - go.opentelemetry.io/otel v0.13.0 // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/crypto v0.14.0 // indirect golang.org/x/sync v0.2.0 // indirect diff --git a/go.sum b/go.sum index 3d8cac877..ed52db105 100644 --- a/go.sum +++ b/go.sum @@ -41,8 +41,10 @@ github.com/TykTechnologies/gorpc v0.0.0-20210624160652-fe65bda0ccb9 h1:fbxHiuw/2 github.com/TykTechnologies/gorpc v0.0.0-20210624160652-fe65bda0ccb9/go.mod h1:v6v7Mlj08+EmEcXOfpuTxGt2qYU9yhqqtv4QF9Wf50E= github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632 h1:T5NWziFusj8au5nxAqMMh/bZyX9CAyYnBkaMSsfH6BA= github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632/go.mod h1:UsPYgOFBpNzDXLEti7MKOwHLpVSqdzuNGkVFPspQmnQ= -github.com/TykTechnologies/storage v1.0.8 h1:MBs6hk5oLOmr2qK5/rl+dYO6iDMez6u3QkwOCL6K8n8= -github.com/TykTechnologies/storage v1.0.8/go.mod h1:+0S3KuNlLGBTMTSFREuZFm315zzXjuuCO4QSAPy+d3M= +github.com/TykTechnologies/storage v1.1.0 h1:OgXbnuA0QCWWROtLRlztNTX6kKC3GkAwEEdPmvuAPE4= +github.com/TykTechnologies/storage v1.1.0/go.mod h1:zcANqpIsDL/l/1zsMMERmjBeJYpER9XMi/dw2Gqa7m4= +github.com/TykTechnologies/storage v1.1.1 h1:mD9etJLHsXbluQIDX8/05Lk+ciqaAMXL5Pts9Wy+WAU= +github.com/TykTechnologies/storage v1.1.1/go.mod h1:zcANqpIsDL/l/1zsMMERmjBeJYpER9XMi/dw2Gqa7m4= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= @@ -100,12 +102,15 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= github.com/cenkalti/backoff/v4 v4.0.2 h1:JIufpQLbh4DkbQoii76ItQIUFzevQSqOLZca4eamEDs= github.com/cenkalti/backoff/v4 v4.0.2/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= -github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= @@ -145,8 +150,6 @@ github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= -github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/getkin/kin-openapi v0.61.0/go.mod h1:7Yn5whZr5kJi6t+kShccXS8ae1APpYTW6yheSwk8Yi4= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE= @@ -163,13 +166,10 @@ github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= -github.com/go-redis/redis/v8 v8.3.1 h1:jEPCgHQopfNaABun3NVN9pv2K7RjstY/7UJD6UEKFEY= -github.com/go-redis/redis/v8 v8.3.1/go.mod h1:a2xkpBM7NJUN5V5kiF46X5Ltx4WeXJ9757X/ScKUBdE= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0 h1:pKjeDsx7HGGbjr7VGI1HksxDJqSjaGED3cSw9GeSI98= github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0/go.mod h1:rWibcVfwbUxi/QXW84U7vNTcIcZFd6miwbt8ritxh/Y= github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= @@ -197,7 +197,6 @@ github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrU github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= -github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= @@ -238,6 +237,7 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/helloeave/json v1.15.3 h1:roUxUEGhsSvhuhi80c4qmLiW633d5uf0mkzUGzBMfX8= github.com/helloeave/json v1.15.3/go.mod h1:uTHhuUsgnrpm9cc7Gi3tfIUwgf1dq/7+uLfpUFLBFEQ= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huandu/xstrings v1.3.2/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -341,6 +341,7 @@ github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFB github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -396,9 +397,6 @@ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6f github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae/go.mod h1:qAyveg+e4CE+eKJXWVjKXM4ck2QobLqTDytGJbLLhJg= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= -github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= -github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU= github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ= github.com/olivere/elastic v6.2.31+incompatible h1:zwJIIsgfiDBuDS3sb6MCbm/e03BPEJoGZvqevZXM254= @@ -407,15 +405,9 @@ github.com/olivere/elastic/v7 v7.0.12/go.mod h1:14rWX28Pnh3qCKYRVnSGXWLf9MbLonYS github.com/olivere/elastic/v7 v7.0.28 h1:KAP4EuaEcvPJknRNkAAso1xeu0C1+/CeDQsxj9Cw9Fg= github.com/olivere/elastic/v7 v7.0.28/go.mod h1:DzHQoqd6YqSuvF1lk/fR4cW4FNUNzSD5/F5MBm3GRMo= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.14.1/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= -github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc= -github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= -github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.10.2/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.20.0 h1:8W0cWlwFkflGPLltQvLRB7ZVD5HuP6ng320w2IS245Q= github.com/onsi/gomega v1.20.0/go.mod h1:DtrZpjmvpn2mPm4YWQa0/ALMDj9v4YxLgojwPeREyVo= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= @@ -457,6 +449,8 @@ github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+Pymzi github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= github.com/quipo/statsd v0.0.0-20160923160612-75b7afedf0d2 h1:IvjiJDGCF8L8TjKHQKmLAjWztpKDCAaRifiRMdGzWk0= github.com/quipo/statsd v0.0.0-20160923160612-75b7afedf0d2/go.mod h1:1COUodqytMiv/GkAVUGhc0CA6e8xak5U4551TY7iEe0= +github.com/redis/go-redis/v9 v9.3.1 h1:KqdY8U+3X6z+iACvumCNxnoluToB+9Me+TvyFa21Mds= +github.com/redis/go-redis/v9 v9.3.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/resurfaceio/logger-go/v3 v3.2.1 h1:tTPvGp+FpH35aaT/nnhP4n/Rh/f1vHe64WoXTDgv0fY= github.com/resurfaceio/logger-go/v3 v3.2.1/go.mod h1:YPcxFUcloW37F1WQA9MUcGWu2JzlvBxlCfFF5+T3GO8= github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= @@ -555,8 +549,6 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= -go.opentelemetry.io/otel v0.13.0 h1:2isEnyzjjJZq6r2EKMsFj4TxiQiexsM04AVhwbR/oBA= -go.opentelemetry.io/otel v0.13.0/go.mod h1:dlSNewoRYikTkotEnxdmuBHgzT+k/idJSfDv/FxEnOY= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= @@ -637,7 +629,6 @@ golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= @@ -679,21 +670,16 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200107162124-548cf772de50/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -747,7 +733,6 @@ golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapK golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200108203644-89082a384178/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= -golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -816,7 +801,9 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw= @@ -833,7 +820,6 @@ gopkg.in/vmihailenco/msgpack.v2 v2.9.1 h1:kb0VV7NuIojvRfzwslQeP3yArBqJHW9tOl4t38 gopkg.in/vmihailenco/msgpack.v2 v2.9.1/go.mod h1:/3Dn1Npt9+MYyLpYYXjInO/5jvMLamn+AEGwNEOatn8= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= diff --git a/main.go b/main.go index 31bdb2818..824fd2a47 100644 --- a/main.go +++ b/main.go @@ -23,12 +23,14 @@ import ( kingpin "gopkg.in/alecthomas/kingpin.v2" ) -var SystemConfig TykPumpConfiguration -var AnalyticsStore storage.AnalyticsStorage -var UptimeStorage storage.AnalyticsStorage -var Pumps []pumps.Pump -var UptimePump pumps.UptimePump -var AnalyticsSerializers []serializer.AnalyticsSerializer +var ( + SystemConfig TykPumpConfiguration + AnalyticsStore storage.AnalyticsStorage + UptimeStorage storage.AnalyticsStorage + Pumps []pumps.Pump + UptimePump pumps.UptimePump + AnalyticsSerializers []serializer.AnalyticsSerializer +) var log = logger.GetLogger() @@ -66,7 +68,7 @@ func Init() { demoMode = &envDemo } - //Serializer init + // Serializer init AnalyticsSerializers = []serializer.AnalyticsSerializer{serializer.NewAnalyticsSerializer(serializer.MSGP_SERIALIZER), serializer.NewAnalyticsSerializer(serializer.PROTOBUF_SERIALIZER)} log.WithFields(logrus.Fields{ @@ -96,36 +98,69 @@ func Init() { if *debugMode { log.Level = logrus.DebugLevel } - } func setupAnalyticsStore() { switch SystemConfig.AnalyticsStorageType { - case "redis": - AnalyticsStore = &storage.RedisClusterStorageManager{} - UptimeStorage = &storage.RedisClusterStorageManager{} - default: - AnalyticsStore = &storage.RedisClusterStorageManager{} - UptimeStorage = &storage.RedisClusterStorageManager{} - } + case "redis", "": + var err error + AnalyticsStore, err = storage.NewTemporalStorageHandler(SystemConfig.AnalyticsStorageConfig, false) + if err != nil { + log.WithFields(logrus.Fields{ + "prefix": mainPrefix, + }).Fatal("Error connecting to Temporal Storage: ", err) + } + err = AnalyticsStore.Init() + if err != nil { + log.WithFields(logrus.Fields{ + "prefix": mainPrefix, + }).Fatal("Error connecting to Temporal Storage: ", err) + } + + // Copy across the redis configuration + uptimeConf := SystemConfig.AnalyticsStorageConfig + // Swap key prefixes for uptime purger + uptimeConf.KeyPrefix = "host-checker:" - AnalyticsStore.Init(SystemConfig.AnalyticsStorageConfig) + UptimeStorage, err = storage.NewTemporalStorageHandler(uptimeConf, false) + if err != nil { + log.WithFields(logrus.Fields{ + "prefix": mainPrefix, + }).Fatal("Error connecting to Temporal Storage: ", err) + } - // Copy across the redis configuration - uptimeConf := SystemConfig.AnalyticsStorageConfig + err = UptimeStorage.Init() + if err != nil { + log.WithFields(logrus.Fields{ + "prefix": mainPrefix, + }).Fatal("Error connecting to Redis: ", err) + } - // Swap key prefixes for uptime purger - uptimeConf.RedisKeyPrefix = "host-checker:" - UptimeStorage.Init(uptimeConf) + default: + log.WithFields(logrus.Fields{ + "prefix": mainPrefix, + }).Fatal("Invalid analytics storage type: ", SystemConfig.AnalyticsStorageType) + } } func storeVersion() { - var versionStore = &storage.RedisClusterStorageManager{} - versionConf := SystemConfig.AnalyticsStorageConfig - versionStore.KeyPrefix = "version-check-" - versionStore.Config = versionConf - versionStore.Connect() - err := versionStore.SetKey("pump", pumps.Version, 0) + versionConf := &SystemConfig.AnalyticsStorageConfig + versionConf.KeyPrefix = "version-check-" + versionStore, err := storage.NewTemporalStorageHandler(versionConf, false) + if err != nil { + log.WithFields(logrus.Fields{ + "prefix": mainPrefix, + }).Fatal("Error connecting to Temporal Storage: ", err) + } + + err = versionStore.Init() + if err != nil { + log.WithFields(logrus.Fields{ + "prefix": mainPrefix, + }).Fatal("Error connecting to Temporal Storage: ", err) + } + + err = versionStore.SetKey("pump", pumps.Version, 0) if err != nil { log.WithFields(logrus.Fields{ "prefix": mainPrefix, @@ -177,7 +212,6 @@ func initialisePumps() { if !SystemConfig.DontPurgeUptimeData { initialiseUptimePump() } - } func initialiseUptimePump() { @@ -210,7 +244,7 @@ func StartPurgeLoop(wg *sync.WaitGroup, ctx context.Context, secInterval int, ch for i := -1; i < 10; i++ { var analyticsKeyName string if i == -1 { - //if it's the first iteration, we look for tyk-system-analytics to maintain backwards compatibility or if analytics_config.enable_multiple_analytics_keys is disabled in the gateway + // if it's the first iteration, we look for tyk-system-analytics to maintain backwards compatibility or if analytics_config.enable_multiple_analytics_keys is disabled in the gateway analyticsKeyName = storage.ANALYTICS_KEYNAME } else { analyticsKeyName = fmt.Sprintf("%v_%v", storage.ANALYTICS_KEYNAME, i) @@ -218,7 +252,12 @@ func StartPurgeLoop(wg *sync.WaitGroup, ctx context.Context, secInterval int, ch for _, serializerMethod := range AnalyticsSerializers { analyticsKeyName += serializerMethod.GetSuffix() - AnalyticsValues := AnalyticsStore.GetAndDeleteSet(analyticsKeyName, chunkSize, expire) + AnalyticsValues, err := AnalyticsStore.GetAndDeleteSet(analyticsKeyName, chunkSize, expire) + if err != nil { + log.WithFields(logrus.Fields{ + "prefix": mainPrefix, + }).Error("Error on Purge Loop. Is Temporal Storage down?: " + err.Error()) + } if len(AnalyticsValues) > 0 { PreprocessAnalyticsValues(AnalyticsValues, serializerMethod, analyticsKeyName, omitDetails, job, startTime, secInterval) } @@ -229,7 +268,12 @@ func StartPurgeLoop(wg *sync.WaitGroup, ctx context.Context, secInterval int, ch job.Timing("purge_time_all", time.Since(startTime).Nanoseconds()) if !SystemConfig.DontPurgeUptimeData { - UptimeValues := UptimeStorage.GetAndDeleteSet(storage.UptimeAnalytics_KEYNAME, chunkSize, expire) + UptimeValues, err := UptimeStorage.GetAndDeleteSet(storage.UptimeAnalytics_KEYNAME, chunkSize, expire) + if err != nil { + log.WithFields(logrus.Fields{ + "prefix": mainPrefix, + }).Error("Error on Purge Loop. Is Temporal Storage down?: " + err.Error()) + } UptimePump.WriteUptimeData(UptimeValues) } @@ -305,7 +349,6 @@ func writeToPumps(keys []interface{}, job *health.Job, startTime time.Time, purg } func filterData(pump pumps.Pump, keys []interface{}) []interface{} { - shouldTrim := SystemConfig.MaxRecordSize != 0 || pump.GetMaxRecordSize() != 0 filters := pump.GetFilters() ignoreFields := pump.GetIgnoreFields() @@ -381,11 +424,11 @@ func execPumpWriting(wg *sync.WaitGroup, pmp pumps.Pump, keys *[]interface{}, pu }).Debug("Writing to: ", pmp.GetName()) ch := make(chan error, 1) - //Load pump timeout + // Load pump timeout timeout := pmp.GetTimeout() var ctx context.Context var cancel context.CancelFunc - //Initialize context depending if the pump has a configured timeout + // Initialize context depending if the pump has a configured timeout if timeout > 0 { ctx, cancel = context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second) } else { diff --git a/pumps/splunk.go b/pumps/splunk.go index 1c7c27218..ab4972edc 100644 --- a/pumps/splunk.go +++ b/pumps/splunk.go @@ -11,7 +11,8 @@ import ( "strings" "github.com/TykTechnologies/tyk-pump/analytics" - retry "github.com/TykTechnologies/tyk-pump/http-retry" + "github.com/TykTechnologies/tyk-pump/retry" + "github.com/mitchellh/mapstructure" ) diff --git a/pumps/splunk_test.go b/pumps/splunk_test.go index 81a5ebbc7..9ec67513a 100644 --- a/pumps/splunk_test.go +++ b/pumps/splunk_test.go @@ -3,7 +3,6 @@ package pumps import ( "context" "encoding/json" - "fmt" "io/ioutil" "net/http" "net/http/httptest" @@ -54,7 +53,6 @@ func (h *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { r.Body.Close() if h.returnErrors >= h.reqCount { - fmt.Println("returning err.......") w.WriteHeader(http.StatusInternalServerError) _, err := w.Write([]byte("splunk internal error")) if err != nil { @@ -249,8 +247,6 @@ func Test_SplunkWriteDataBatch(t *testing.T) { keys[1] = analytics.AnalyticsRecord{OrgID: "1", APIID: "123", Path: "/test-path", Method: "POST", TimeStamp: time.Now()} keys[2] = analytics.AnalyticsRecord{OrgID: "1", APIID: "123", Path: "/test-path", Method: "POST", TimeStamp: time.Now()} - fmt.Println(maxContentLength) - pmp := SplunkPump{} cfg := make(map[string]interface{}) diff --git a/pumps/version.go b/pumps/version.go index 119270cfb..82baeec46 100644 --- a/pumps/version.go +++ b/pumps/version.go @@ -1,7 +1,7 @@ package pumps var ( - Version = "v1.8.0" + Version = "v1.9.0" BuiltBy string Commit string BuildDate string diff --git a/http-retry/http-retry.go b/retry/http-retry.go similarity index 99% rename from http-retry/http-retry.go rename to retry/http-retry.go index 3a6bdd3e7..50908e9a8 100644 --- a/http-retry/http-retry.go +++ b/retry/http-retry.go @@ -1,4 +1,4 @@ -package httpretry +package retry import ( "bytes" diff --git a/retry/storage-retry.go b/retry/storage-retry.go new file mode 100644 index 000000000..ab3ef6131 --- /dev/null +++ b/retry/storage-retry.go @@ -0,0 +1,16 @@ +package retry + +import ( + "time" + + "github.com/cenkalti/backoff/v4" +) + +func GetTemporalStorageExponentialBackoff() *backoff.ExponentialBackOff { + exponentialBackoff := backoff.NewExponentialBackOff() + exponentialBackoff.Multiplier = 2 + exponentialBackoff.MaxInterval = 10 * time.Second + exponentialBackoff.MaxElapsedTime = 0 + + return exponentialBackoff +} diff --git a/storage/init.go b/storage/init.go deleted file mode 100644 index bd853ca88..000000000 --- a/storage/init.go +++ /dev/null @@ -1,15 +0,0 @@ -package storage - -import ( - "github.com/TykTechnologies/tyk-pump/logger" -) - -var log = logger.GetLogger() -var AvailableStores map[string]AnalyticsStorage - -func init() { - AvailableStores = make(map[string]AnalyticsStorage) - - // Register all the storage handlers here - AvailableStores["redis"] = &RedisClusterStorageManager{} -} diff --git a/storage/redis.go b/storage/redis.go deleted file mode 100644 index 70accc0b9..000000000 --- a/storage/redis.go +++ /dev/null @@ -1,332 +0,0 @@ -package storage - -import ( - "context" - "crypto/tls" - "strconv" - "strings" - "time" - - "github.com/go-redis/redis/v8" - "github.com/sirupsen/logrus" - - "github.com/kelseyhightower/envconfig" - "github.com/mitchellh/mapstructure" -) - -// ------------------- REDIS CLUSTER STORAGE MANAGER ------------------------------- - -var redisClusterSingleton redis.UniversalClient -var redisLogPrefix = "redis" -var ENV_REDIS_PREFIX = "TYK_PMP_REDIS" -var ctx = context.Background() - -type EnvMapString map[string]string - -func (e *EnvMapString) Decode(value string) error { - units := strings.Split(value, ",") - m := make(map[string]string) - for _, unit := range units { - kvArr := strings.Split(unit, ":") - if len(kvArr) > 1 { - m[kvArr[0]] = kvArr[1] - } - } - - *e = m - - return nil -} - -type RedisStorageConfig struct { - // Deprecated. - Type string `json:"type" mapstructure:"type"` - // Redis host value. - Host string `json:"host" mapstructure:"host"` - // Redis port value. - Port int `json:"port" mapstructure:"port"` - // Deprecated. Use Addrs instead. - Hosts EnvMapString `json:"hosts" mapstructure:"hosts"` - // Use instead of the host value if you're running a redis cluster with mutliple instances. - Addrs []string `json:"addrs" mapstructure:"addrs"` - // Sentinel redis master name. - MasterName string `json:"master_name" mapstructure:"master_name"` - // Sentinel redis password. - SentinelPassword string `json:"sentinel_password" mapstructure:"sentinel_password"` - // Redis username. - Username string `json:"username" mapstructure:"username"` - // Redis password. - Password string `json:"password" mapstructure:"password"` - // Redis database. - Database int `json:"database" mapstructure:"database"` - // How long to allow for new connections to be established (in milliseconds). Defaults to 5sec. - Timeout int `json:"timeout" mapstructure:"timeout"` - // Maximum number of idle connections in the pool. - MaxIdle int `json:"optimisation_max_idle" mapstructure:"optimisation_max_idle"` - // Maximum number of connections allocated by the pool at a given time. When zero, there is no - // limit on the number of connections in the pool. Defaults to 500. - MaxActive int `json:"optimisation_max_active" mapstructure:"optimisation_max_active"` - // Enable this option if you are using a redis cluster. Default is `false`. - EnableCluster bool `json:"enable_cluster" mapstructure:"enable_cluster"` - // Prefix the redis key names. Defaults to "analytics-". - RedisKeyPrefix string `json:"redis_key_prefix" mapstructure:"redis_key_prefix"` - // Setting this to true to use SSL when connecting to Redis. - RedisUseSSL bool `json:"redis_use_ssl" mapstructure:"redis_use_ssl"` - // Set this to `true` to tell Pump to ignore Redis' cert validation. - RedisSSLInsecureSkipVerify bool `json:"redis_ssl_insecure_skip_verify" mapstructure:"redis_ssl_insecure_skip_verify"` -} - -// RedisClusterStorageManager is a storage manager that uses the redis database. -type RedisClusterStorageManager struct { - db redis.UniversalClient - KeyPrefix string - HashKeys bool - Config RedisStorageConfig -} - -func NewRedisClusterPool(forceReconnect bool, config RedisStorageConfig) redis.UniversalClient { - if !forceReconnect { - if redisClusterSingleton != nil { - log.WithFields(logrus.Fields{ - "prefix": redisLogPrefix, - }).Debug("Redis pool already INITIALISED") - return redisClusterSingleton - } - } else { - if redisClusterSingleton != nil { - redisClusterSingleton.Close() - } - } - - log.WithFields(logrus.Fields{ - "prefix": redisLogPrefix, - }).Debug("Creating new Redis connection pool") - - maxActive := 500 - if config.MaxActive > 0 { - maxActive = config.MaxActive - } - - timeout := 5 * time.Second - - if config.Timeout > 0 { - timeout = time.Duration(config.Timeout) * time.Second - } - - var tlsConfig *tls.Config - if config.RedisUseSSL { - tlsConfig = &tls.Config{ - InsecureSkipVerify: config.RedisSSLInsecureSkipVerify, - } - } - - var client redis.UniversalClient - opts := &redis.UniversalOptions{ - MasterName: config.MasterName, - SentinelPassword: config.SentinelPassword, - Addrs: getRedisAddrs(config), - DB: config.Database, - Username: config.Username, - Password: config.Password, - PoolSize: maxActive, - IdleTimeout: 240 * time.Second, - ReadTimeout: timeout, - WriteTimeout: timeout, - DialTimeout: timeout, - TLSConfig: tlsConfig, - } - - if opts.MasterName != "" { - log.Info("--> [REDIS] Creating sentinel-backed failover client") - client = redis.NewFailoverClient(opts.Failover()) - } else if config.EnableCluster { - log.Info("--> [REDIS] Creating cluster client") - client = redis.NewClusterClient(opts.Cluster()) - } else { - log.Info("--> [REDIS] Creating single-node client") - client = redis.NewClient(opts.Simple()) - } - - redisClusterSingleton = client - - return client -} - -func getRedisAddrs(config RedisStorageConfig) (addrs []string) { - if len(config.Addrs) != 0 { - addrs = config.Addrs - } else { - for h, p := range config.Hosts { - addr := h + ":" + p - addrs = append(addrs, addr) - } - } - - if len(addrs) == 0 && config.Port != 0 { - addr := config.Host + ":" + strconv.Itoa(config.Port) - addrs = append(addrs, addr) - } - - return addrs -} - -func (r *RedisClusterStorageManager) GetName() string { - return "redis" -} - -func (r *RedisClusterStorageManager) Init(config interface{}) error { - r.Config = RedisStorageConfig{} - err := mapstructure.Decode(config, &r.Config) - if err != nil { - log.WithFields(logrus.Fields{ - "prefix": redisLogPrefix, - }).Fatal("Failed to decode configuration: ", err) - } - - overrideErr := envconfig.Process(ENV_REDIS_PREFIX, &r.Config) - if overrideErr != nil { - log.Error("Failed to process environment variables for redis: ", overrideErr) - } - - if r.Config.RedisKeyPrefix == "" { - r.KeyPrefix = RedisKeyPrefix - } else { - r.KeyPrefix = r.Config.RedisKeyPrefix - } - return nil -} - -// Connect will establish a connection to the r.db -func (r *RedisClusterStorageManager) Connect() bool { - if r.db == nil { - log.WithFields(logrus.Fields{ - "prefix": redisLogPrefix, - }).Debug("Connecting to redis cluster") - r.db = NewRedisClusterPool(false, r.Config) - return true - } - - log.WithFields(logrus.Fields{ - "prefix": redisLogPrefix, - }).Debug("Storage Engine already initialized...") - - // Reset it just in case - r.db = redisClusterSingleton - return true -} - -func (r *RedisClusterStorageManager) hashKey(in string) string { - return in -} - -func (r *RedisClusterStorageManager) fixKey(keyName string) string { - setKeyName := r.KeyPrefix + r.hashKey(keyName) - - log.WithFields(logrus.Fields{ - "prefix": redisLogPrefix, - }).Debug("Input key was: ", setKeyName) - - return setKeyName -} - -func (r *RedisClusterStorageManager) GetAndDeleteSet(keyName string, chunkSize int64, expire time.Duration) []interface{} { - log.WithFields(logrus.Fields{ - "prefix": redisLogPrefix, - }).Debug("Getting raw key set: ", keyName) - - if r.db == nil { - log.WithFields(logrus.Fields{ - "prefix": redisLogPrefix, - }).Warning("Connection dropped, connecting..") - r.Connect() - return r.GetAndDeleteSet(keyName, chunkSize, expire) - } - - log.WithFields(logrus.Fields{ - "prefix": redisLogPrefix, - }).Debug("keyName is: ", keyName) - - fixedKey := r.fixKey(keyName) - - log.WithFields(logrus.Fields{ - "prefix": redisLogPrefix, - }).Debug("Fixed keyname is: ", fixedKey) - - var lrange *redis.StringSliceCmd - _, err := r.db.TxPipelined(ctx, func(pipe redis.Pipeliner) error { - lrange = pipe.LRange(ctx, fixedKey, 0, chunkSize-1) - - if chunkSize == 0 { - pipe.Del(ctx, fixedKey) - } else { - pipe.LTrim(ctx, fixedKey, chunkSize, -1) - - // extend expiry after successful LTRIM - pipe.Expire(ctx, fixedKey, expire) - } - return nil - }) - - if err != nil { - log.WithFields(logrus.Fields{ - "prefix": redisLogPrefix, - }).Error("Multi command failed: ", err) - r.Connect() - } - - vals := lrange.Val() - - result := make([]interface{}, len(vals)) - for i, v := range vals { - result[i] = v - } - - log.WithFields(logrus.Fields{ - "prefix": redisLogPrefix, - }).Debug("Unpacked vals: ", len(result)) - - return result -} - -// SetKey will create (or update) a key value in the store -func (r *RedisClusterStorageManager) SetKey(keyName, session string, timeout int64) error { - log.Debug("[STORE] SET Raw key is: ", keyName) - log.Debug("[STORE] Setting key: ", r.fixKey(keyName)) - - r.ensureConnection() - err := r.db.Set(ctx, r.fixKey(keyName), session, 0).Err() - if timeout > 0 { - if err := r.SetExp(keyName, timeout); err != nil { - return err - } - } - if err != nil { - log.Error("Error trying to set value: ", err) - return err - } - return nil -} - -func (r *RedisClusterStorageManager) SetExp(keyName string, timeout int64) error { - err := r.db.Expire(ctx, r.fixKey(keyName), time.Duration(timeout)*time.Second).Err() - if err != nil { - log.Error("Could not EXPIRE key: ", err) - } - return err -} - -func (r *RedisClusterStorageManager) ensureConnection() { - if r.db != nil { - // already connected - return - } - log.Info("Connection dropped, reconnecting...") - for { - r.Connect() - if r.db != nil { - // reconnection worked - return - } - log.Info("Reconnecting again...") - } -} diff --git a/storage/redis_test.go b/storage/redis_test.go deleted file mode 100644 index 13bffa07b..000000000 --- a/storage/redis_test.go +++ /dev/null @@ -1,142 +0,0 @@ -package storage - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/go-redis/redis/v8" -) - -func TestRedisAddressConfiguration(t *testing.T) { - - t.Run("Host but no port", func(t *testing.T) { - cfg := RedisStorageConfig{Host: "host"} - if len(getRedisAddrs(cfg)) != 0 { - t.Fatal("Port is 0, there is no valid addr") - } - }) - - t.Run("Port but no host", func(t *testing.T) { - cfg := RedisStorageConfig{Port: 30000} - - addrs := getRedisAddrs(cfg) - if addrs[0] != ":30000" || len(addrs) != 1 { - t.Fatal("Port is valid, it is a valid addr") - } - }) - - t.Run("addrs parameter should have precedence", func(t *testing.T) { - cfg := RedisStorageConfig{Host: "host", Port: 30000} - - addrs := getRedisAddrs(cfg) - if addrs[0] != "host:30000" || len(addrs) != 1 { - t.Fatal("Wrong address") - } - - cfg.Addrs = []string{"override:30000"} - - addrs = getRedisAddrs(cfg) - if addrs[0] != "override:30000" || len(addrs) != 1 { - t.Fatal("Wrong address") - } - }) - - t.Run("Default addresses", func(t *testing.T) { - opts := &redis.UniversalOptions{} - simpleOpts := opts.Simple() - - if simpleOpts.Addr != "127.0.0.1:6379" { - t.Fatal("Wrong default single node address") - } - - opts.Addrs = []string{} - clusterOpts := opts.Cluster() - - if clusterOpts.Addrs[0] != "127.0.0.1:6379" || len(clusterOpts.Addrs) != 1 { - t.Fatal("Wrong default cluster mode address") - } - - opts.Addrs = []string{} - failoverOpts := opts.Failover() - - if failoverOpts.SentinelAddrs[0] != "127.0.0.1:26379" || len(failoverOpts.SentinelAddrs) != 1 { - t.Fatal("Wrong default sentinel mode address") - } - }) -} - -var testData = []struct { - in []string - chunk int64 -}{ - {in: nil, chunk: int64(0)}, - {in: []string{"one"}, chunk: int64(0)}, - {in: []string{"one", "two"}, chunk: int64(0)}, - {in: []string{"one", "two", "three"}, chunk: int64(0)}, - {in: []string{"one", "two", "three", "four"}, chunk: int64(0)}, - {in: []string{"one", "two", "three", "four", "five"}, chunk: int64(0)}, - {in: nil, chunk: int64(1)}, - {in: []string{"one"}, chunk: int64(1)}, - {in: []string{"one", "two"}, chunk: int64(1)}, - {in: []string{"one", "two", "three"}, chunk: int64(1)}, - {in: []string{"one", "two", "three", "four"}, chunk: int64(1)}, - {in: []string{"one", "two", "three", "four", "five"}, chunk: int64(1)}, - {in: nil, chunk: int64(2)}, - {in: []string{"one"}, chunk: int64(2)}, - {in: []string{"one", "two"}, chunk: int64(2)}, - {in: []string{"one", "two", "three"}, chunk: int64(2)}, - {in: []string{"one", "two", "three", "four"}, chunk: int64(2)}, - {in: []string{"one", "two", "three", "four", "five"}, chunk: int64(2)}, - {in: nil, chunk: int64(3)}, - {in: []string{"one"}, chunk: int64(3)}, - {in: []string{"one", "two"}, chunk: int64(3)}, - {in: []string{"one", "two", "three"}, chunk: int64(3)}, - {in: []string{"one", "two", "three", "four"}, chunk: int64(3)}, - {in: []string{"one", "two", "three", "four", "five"}, chunk: int64(3)}, -} - -func TestRedisClusterStorageManager_GetAndDeleteSet(t *testing.T) { - conf := make(map[string]interface{}) - conf["host"] = "localhost" - conf["port"] = 6379 - - r := RedisClusterStorageManager{} - if err := r.Init(conf); err != nil { - t.Fatal("unable to connect", err.Error()) - } - - mockKeyName := "testanalytics" - - for _, tt := range testData { - t.Run(fmt.Sprintf("in: %v", tt), func(t *testing.T) { - ctx := context.Background() - if tt.in != nil { - r.db.RPush(ctx, r.fixKey(mockKeyName), tt.in) - } - - iterations := 1 - if tt.chunk > 0 { - iterations = len(tt.in) / int(tt.chunk) - if rem := len(tt.in) % int(tt.chunk); rem > 0 { - iterations += 1 - } - } - - t.Log("iterations", iterations, "tt.in", len(tt.in), "tt.chunk", tt.chunk) - - count := 0 - for i := 0; i < iterations; i++ { - res := r.GetAndDeleteSet(mockKeyName, tt.chunk, 60*time.Second) - - count += len(res) - t.Logf("---> %d: %v", i, res) - } - - if count != len(tt.in) { - t.Fatal() - } - }) - } -} diff --git a/storage/store.go b/storage/store.go index edd0d3cc4..109320efb 100644 --- a/storage/store.go +++ b/storage/store.go @@ -1,16 +1,99 @@ package storage -import "time" +import ( + "strings" + "time" + + "github.com/TykTechnologies/tyk-pump/logger" +) type AnalyticsStorage interface { - Init(config interface{}) error + Init() error GetName() string - Connect() bool - GetAndDeleteSet(setName string, chunkSize int64, expire time.Duration) []interface{} + GetAndDeleteSet(setName string, chunkSize int64, expire time.Duration) ([]interface{}, error) } const ( - RedisKeyPrefix string = "analytics-" + KeyPrefix string = "analytics-" ANALYTICS_KEYNAME string = "tyk-system-analytics" UptimeAnalytics_KEYNAME string = "tyk-uptime-analytics" ) + +var log = logger.GetLogger() + +// nolint:govet +type TemporalStorageConfig struct { + // Deprecated. + Type string `json:"type" mapstructure:"type"` + // Host value. For example: "localhost". + Host string `json:"host" mapstructure:"host"` + // Port value. For example: 6379. + Port int `json:"port" mapstructure:"port"` + // Deprecated: use Addrs instead. + Hosts EnvMapString `json:"hosts" mapstructure:"hosts"` + // Use instead of the host value if you're running a cluster instance with multiple instances. + Addrs []string `json:"addrs" mapstructure:"addrs"` + // Sentinel master name. + MasterName string `json:"master_name" mapstructure:"master_name"` + // Sentinel password. + SentinelPassword string `json:"sentinel_password" mapstructure:"sentinel_password"` + // DB username. + Username string `json:"username" mapstructure:"username"` + // DB password. + Password string `json:"password" mapstructure:"password"` + // Database name. + Database int `json:"database" mapstructure:"database"` + // How long to allow for new connections to be established (in milliseconds). Defaults to 5sec. + Timeout int `json:"timeout" mapstructure:"timeout"` + // Maximum number of idle connections in the pool. + MaxIdle int `json:"optimisation_max_idle" mapstructure:"optimisation_max_idle"` + // Maximum number of connections allocated by the pool at a given time. When zero, there is no + // limit on the number of connections in the pool. Defaults to 500. + MaxActive int `json:"optimisation_max_active" mapstructure:"optimisation_max_active"` + // Enable this option if you are using a cluster instance. Default is `false`. + EnableCluster bool `json:"enable_cluster" mapstructure:"enable_cluster"` + + // Prefix the key names. Defaults to "analytics-". + // Deprecated: use KeyPrefix instead. + RedisKeyPrefix string `json:"redis_key_prefix" mapstructure:"redis_key_prefix"` + // Prefix the key names. Defaults to "analytics-". + KeyPrefix string `json:"key_prefix" mapstructure:"key_prefix"` + + // Setting this to true to use SSL when connecting to the DB. + UseSSL bool `json:"use_ssl" mapstructure:"use_ssl"` + // Set this to `true` to tell Pump to ignore database's cert validation. + SSLInsecureSkipVerify bool `json:"ssl_insecure_skip_verify" mapstructure:"ssl_insecure_skip_verify"` + // Path to the CA file. + SSLCAFile string `json:"ssl_ca_file" mapstructure:"ssl_ca_file"` + // Path to the cert file. + SSLCertFile string `json:"ssl_cert_file" mapstructure:"ssl_cert_file"` + // Path to the key file. + SSLKeyFile string `json:"ssl_key_file" mapstructure:"ssl_key_file"` + // Maximum supported TLS version. Defaults to TLS 1.3, valid values are TLS 1.0, 1.1, 1.2, 1.3. + SSLMaxVersion string `json:"ssl_max_version" mapstructure:"ssl_max_version"` + // Minimum supported TLS version. Defaults to TLS 1.2, valid values are TLS 1.0, 1.1, 1.2, 1.3. + SSLMinVersion string `json:"ssl_min_version" mapstructure:"ssl_min_version"` + // Setting this to true to use SSL when connecting to the DB. + // Deprecated: use UseSSL instead. + RedisUseSSL bool `json:"redis_use_ssl" mapstructure:"redis_use_ssl"` + // Set this to `true` to tell Pump to ignore database's cert validation. + // Deprecated: use SSLInsecureSkipVerify instead. + RedisSSLInsecureSkipVerify bool `json:"redis_ssl_insecure_skip_verify" mapstructure:"redis_ssl_insecure_skip_verify"` +} + +type EnvMapString map[string]string + +func (e *EnvMapString) Decode(value string) error { + units := strings.Split(value, ",") + m := make(map[string]string) + for _, unit := range units { + kvArr := strings.Split(unit, ":") + if len(kvArr) > 1 { + m[kvArr[0]] = kvArr[1] + } + } + + *e = m + + return nil +} diff --git a/storage/temporal_storage.go b/storage/temporal_storage.go new file mode 100644 index 000000000..884e17136 --- /dev/null +++ b/storage/temporal_storage.go @@ -0,0 +1,340 @@ +package storage + +import ( + "context" + "fmt" + "time" + + "github.com/TykTechnologies/storage/temporal/connector" + keyvalue "github.com/TykTechnologies/storage/temporal/keyvalue" + "github.com/TykTechnologies/storage/temporal/list" + "github.com/TykTechnologies/storage/temporal/model" + "github.com/TykTechnologies/tyk-pump/retry" + + "github.com/cenkalti/backoff/v4" + + "github.com/sirupsen/logrus" + + "github.com/kelseyhightower/envconfig" + "github.com/mitchellh/mapstructure" +) + +var ( + connectorSingleton model.Connector + logPrefix = "temporal-storage" + // Deprecated: use envTemporalStoragePrefix instead. + envRedisPrefix = "TYK_PMP_REDIS" + envTemporalStoragePrefix = "TYK_PMP_TEMPORAL_STORAGE" + ctx = context.Background() +) + +// TemporalStorageHandler is a storage manager that uses non data-persistent databases, like Redis. +type TemporalStorageHandler struct { + Config *TemporalStorageConfig + kv model.KeyValue + list model.List + forceReconnect bool +} + +func NewTemporalStorageHandler(config interface{}, forceReconnect bool) (*TemporalStorageHandler, error) { + r := &TemporalStorageHandler{ + forceReconnect: forceReconnect, + } + + switch c := config.(type) { + case map[string]interface{}: + err := mapstructure.Decode(config, &r.Config) + if err != nil { + return nil, err + } + + return r, nil + + case *TemporalStorageConfig: + r.Config = c + + return r, nil + + case TemporalStorageConfig: + r.Config = &c + + return r, nil + + default: + return nil, fmt.Errorf("unsupported config type: %T", config) + } +} + +func (r *TemporalStorageHandler) Init() error { + if r.Config == nil { + r.Config = &TemporalStorageConfig{} + log.WithFields(logrus.Fields{ + "prefix": logPrefix, + }).Debug("Config is nil, using default config") + } + + overrideErr := envconfig.Process(envRedisPrefix, r.Config) + if overrideErr != nil { + return overrideErr + } + + overrideErr = envconfig.Process(envTemporalStoragePrefix, r.Config) + if overrideErr != nil { + return overrideErr + } + + switch { + case r.Config.KeyPrefix != "": + // Keep the KeyPrefix as is + case r.Config.RedisKeyPrefix != "": + r.Config.KeyPrefix = r.Config.RedisKeyPrefix + default: + r.Config.KeyPrefix = KeyPrefix + } + + if r.Config.Type != "" { + logPrefix = r.Config.Type + } + + return r.connect() +} + +// Connect will establish a connection to the r.db +func (r *TemporalStorageHandler) connect() error { + var err error + if connectorSingleton == nil || r.forceReconnect { + log.WithFields(logrus.Fields{ + "prefix": logPrefix, + }).Debug("Connecting to temporal storage") + if r.Config.Type != "redis" && r.Config.Type != "" { + return fmt.Errorf("unsupported database type: %s", r.Config.Type) + } + + if err = r.resetConnection(r.Config); err != nil { + return err + } + + log.WithFields(logrus.Fields{"prefix": logPrefix}).Debug("Temporal Storage already INITIALISED") + } else if r.kv == nil || r.list == nil { + // This is the case when the connector is already created but we're instantiating a new TemporalStorageHandler + r.kv, err = getKVFromConnector() + if err != nil { + return err + } + + r.list, err = getListFromConnector() + if err != nil { + return err + } + } + + log.WithFields(logrus.Fields{ + "prefix": logPrefix, + }).Debug("Storage Engine already initialized...") + + return nil +} + +func (r *TemporalStorageHandler) resetConnection(config *TemporalStorageConfig) error { + if connectorSingleton != nil { + if err := connectorSingleton.Disconnect(ctx); err != nil { + return fmt.Errorf("error disconnecting Temporal Storage: %s", err) + } + } + + log.WithFields(logrus.Fields{ + "prefix": logPrefix, + }).Debug("Creating new Redis connection pool") + + maxActive := 500 + if config.MaxActive > 0 { + maxActive = config.MaxActive + } + + timeout := 5 + + if config.Timeout > 0 { + timeout = config.Timeout + } + + opts := &model.RedisOptions{ + MasterName: config.MasterName, + SentinelPassword: config.SentinelPassword, + Addrs: config.Addrs, + Database: config.Database, + Username: config.Username, + Password: config.Password, + MaxActive: maxActive, + Timeout: timeout, + EnableCluster: config.EnableCluster, + Host: config.Host, + Port: config.Port, + Hosts: config.Hosts, + } + + tlsOptions := &model.TLS{ + Enable: config.UseSSL || config.RedisUseSSL, + InsecureSkipVerify: config.SSLInsecureSkipVerify || config.RedisSSLInsecureSkipVerify, + CAFile: config.SSLCAFile, + CertFile: config.SSLCertFile, + KeyFile: config.SSLKeyFile, + MaxVersion: config.SSLMaxVersion, + MinVersion: config.SSLMinVersion, + } + + conn, kv, list, err := createConnector(opts, tlsOptions) + if err != nil { + return err + } + + connectorSingleton = conn + r.kv = kv + r.list = list + + return nil +} + +func createConnector(opts *model.RedisOptions, tlsOptions *model.TLS) (model.Connector, model.KeyValue, model.List, error) { + conn, err := connector.NewConnector(model.RedisV9Type, model.WithRedisConfig(opts), model.WithTLS(tlsOptions)) + if err != nil { + return nil, nil, nil, err + } + + kv, err := keyvalue.NewKeyValue(conn) + if err != nil { + return nil, nil, nil, err + } + + l, err := list.NewList(conn) + if err != nil { + return nil, nil, nil, err + } + + return conn, kv, l, nil +} + +func getKVFromConnector() (model.KeyValue, error) { + kv, err := keyvalue.NewKeyValue(connectorSingleton) + if err != nil { + return nil, err + } + + return kv, nil +} + +func getListFromConnector() (model.List, error) { + l, err := list.NewList(connectorSingleton) + if err != nil { + return nil, err + } + + return l, nil +} + +func (r *TemporalStorageHandler) GetName() string { + if r.Config.Type != "" { + return r.Config.Type + } + + return "redis" +} + +func (r *TemporalStorageHandler) fixKey(keyName string) string { + setKeyName := r.Config.KeyPrefix + keyName + + log.WithFields(logrus.Fields{ + "prefix": logPrefix, + }).Debug("Input key was: ", setKeyName) + + return setKeyName +} + +func (r *TemporalStorageHandler) GetAndDeleteSet(keyName string, chunkSize int64, expire time.Duration) ([]interface{}, error) { + log.WithFields(logrus.Fields{ + "prefix": logPrefix, + }).Debug("Getting raw key set: ", keyName) + + err := r.ensureConnection() + if err != nil { + return nil, err + } + + log.WithFields(logrus.Fields{ + "prefix": logPrefix, + }).Debug("keyName is: ", keyName) + + fixedKey := r.fixKey(keyName) + + log.WithFields(logrus.Fields{ + "prefix": logPrefix, + }).Debug("Fixed keyname is: ", fixedKey) + + // In Pump, we used to delete a key when chunkSize was 0. + // This is not the case with Storage Library. So we need to check if chunkSize is 0 and set it to -1. + if chunkSize == 0 { + chunkSize = -1 + } + + result, err := r.list.Pop(ctx, fixedKey, chunkSize) + if err != nil { + return nil, err + } + + if chunkSize != -1 { + err = r.kv.Expire(ctx, fixedKey, expire) + if err != nil { + return nil, err + } + } + + intResult := []interface{}{} + for _, v := range result { + intResult = append(intResult, v) + } + + return intResult, nil +} + +// SetKey will create (or update) a key value in the store +func (r *TemporalStorageHandler) SetKey(keyName, session string, timeout int64) error { + log.Debug("[STORE] SET Raw key is: ", keyName) + log.Debug("[STORE] Setting key: ", r.fixKey(keyName)) + + err := r.ensureConnection() + if err != nil { + return err + } + + err = r.kv.Set(ctx, r.fixKey(keyName), session, time.Duration(timeout)*time.Second) + if err != nil { + log.Error("Error trying to set value: ", err) + return err + } + return nil +} + +func (r *TemporalStorageHandler) ensureConnection() error { + if connectorSingleton != nil { + return nil + } + + log.Info("Connection dropped, reconnecting...") + backoffStrategy := retry.GetTemporalStorageExponentialBackoff() + + operation := func() error { + if err := r.connect(); err != nil { + return err + } + + if connectorSingleton == nil { + return fmt.Errorf("connection failed") + } + return nil + } + + if err := backoff.Retry(operation, backoffStrategy); err != nil { + return fmt.Errorf("failed to reconnect after several attempts: %w", err) + } + + return nil +} diff --git a/storage/temporal_storage_test.go b/storage/temporal_storage_test.go new file mode 100644 index 000000000..cd2f2eadb --- /dev/null +++ b/storage/temporal_storage_test.go @@ -0,0 +1,296 @@ +package storage + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/TykTechnologies/storage/temporal/model" + "github.com/stretchr/testify/assert" +) + +var testData = []struct { + in []string + chunk int64 +}{ + {in: nil, chunk: int64(0)}, + {in: []string{"one"}, chunk: int64(0)}, + {in: []string{"one", "two"}, chunk: int64(0)}, + {in: []string{"one", "two", "three"}, chunk: int64(0)}, + {in: []string{"one", "two", "three", "four"}, chunk: int64(0)}, + {in: []string{"one", "two", "three", "four", "five"}, chunk: int64(0)}, + {in: nil, chunk: int64(1)}, + {in: []string{"one"}, chunk: int64(1)}, + {in: []string{"one", "two"}, chunk: int64(1)}, + {in: []string{"one", "two", "three"}, chunk: int64(1)}, + {in: []string{"one", "two", "three", "four"}, chunk: int64(1)}, + {in: []string{"one", "two", "three", "four", "five"}, chunk: int64(1)}, + {in: nil, chunk: int64(2)}, + {in: []string{"one"}, chunk: int64(2)}, + {in: []string{"one", "two"}, chunk: int64(2)}, + {in: []string{"one", "two", "three"}, chunk: int64(2)}, + {in: []string{"one", "two", "three", "four"}, chunk: int64(2)}, + {in: []string{"one", "two", "three", "four", "five"}, chunk: int64(2)}, + {in: nil, chunk: int64(3)}, + {in: []string{"one"}, chunk: int64(3)}, + {in: []string{"one", "two"}, chunk: int64(3)}, + {in: []string{"one", "two", "three"}, chunk: int64(3)}, + {in: []string{"one", "two", "three", "four"}, chunk: int64(3)}, + {in: []string{"one", "two", "three", "four", "five"}, chunk: int64(3)}, +} + +func TestRedisClusterStorageManager_GetAndDeleteSet(t *testing.T) { + conf := make(map[string]interface{}) + conf["host"] = "localhost" + conf["port"] = 6379 + + r, err := NewTemporalStorageHandler(conf, false) + if err != nil { + t.Fatal(err) + } + + if err := r.Init(); err != nil { + t.Fatal("unable to connect", err.Error()) + } + + if connectorSingleton == nil { + t.Fatal("connectorSingleton is nil") + } + + mockKeyName := "testanalytics" + + for _, tt := range testData { + t.Run(fmt.Sprintf("in: %v", tt), func(t *testing.T) { + if tt.in != nil { + in := [][]byte{} + for _, v := range tt.in { + in = append(in, []byte(v)) + } + err := r.list.Append(ctx, false, r.fixKey(mockKeyName), in...) + if err != nil { + t.Fatal(err) + } + } + + iterations := 1 + if tt.chunk > 0 { + iterations = len(tt.in) / int(tt.chunk) + if rem := len(tt.in) % int(tt.chunk); rem > 0 { + iterations++ + } + } + + t.Log("iterations", iterations, "tt.in", len(tt.in), "tt.chunk", tt.chunk) + + count := 0 + for i := 0; i < iterations; i++ { + res, err := r.GetAndDeleteSet(mockKeyName, tt.chunk, 60*time.Second) + if err != nil { + t.Fatal(err) + } + count += len(res) + t.Logf("---> %d: %v", i, res) + } + + if count != len(tt.in) { + t.Fatal() + } + }) + } +} + +func TestNewTemporalClusterStorageHandler(t *testing.T) { + testCases := []struct { + config *TemporalStorageConfig + testName string + forceReconnect bool + expectConnection bool + }{ + { + testName: "Connect to localhost:6379", + config: &TemporalStorageConfig{Host: "localhost", Port: 6379}, + expectConnection: true, + }, + { + testName: "Force reconnect with existing singleton", + forceReconnect: true, + config: &TemporalStorageConfig{Host: "localhost", Port: 6379}, + expectConnection: true, + }, + { + testName: "Invalid configuration", + config: &TemporalStorageConfig{Host: "invalid-host", Port: 6379}, + expectConnection: false, + forceReconnect: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + tsh, err := NewTemporalStorageHandler(tc.config, tc.forceReconnect) + assert.NoError(t, err, "Expected no error on NewTemporalStorageHandler") + + err = tsh.Init() + assert.NoError(t, err, "Expected no error on NewTemporalStorageHandler Init method") + + assert.NotNil(t, connectorSingleton, "Expected connectorSingleton not to be nil") + + assert.NotNil(t, connectorSingleton, "Expected connectorSingleton not to be nil") + assert.NotNil(t, tsh.kv, "Expected kv not to be nil") + assert.NotNil(t, tsh.list, "Expected list not to be nil") + assert.Equal(t, model.RedisV9Type, connectorSingleton.Type(), "Expected connection type to be RedisV9Type") + + if tc.expectConnection { + assert.NoError(t, connectorSingleton.Ping(context.Background()), "Expected no error on ping") + } else { + assert.Error(t, connectorSingleton.Ping(context.Background()), "Expected error on ping") + } + }) + } +} + +func TestTemporalStorageHandler_ensureConnection(t *testing.T) { + conf := make(map[string]interface{}) + conf["host"] = "localhost" + conf["port"] = 6379 + + r, err := NewTemporalStorageHandler(conf, false) + if err != nil { + t.Fatal(err) + } + + if err := r.Init(); err != nil { + t.Fatal("unable to connect", err.Error()) + } + + if connectorSingleton == nil { + t.Fatal("connectorSingleton is nil") + } + + t.Run("Connection already established", func(t *testing.T) { + err := r.ensureConnection() + assert.NoError(t, err, "Expected no error when connection is already established") + }) + + t.Run("Connection dropped, reconnecting", func(t *testing.T) { + connectorSingleton = nil + err := r.ensureConnection() + assert.NoError(t, err, "Expected no error when reconnecting") + assert.NotNil(t, connectorSingleton, "Expected connectorSingleton not to be nil after reconnecting") + }) +} + +func TestTemporalStorageHandler_SetKey(t *testing.T) { + conf := make(map[string]interface{}) + conf["host"] = "localhost" + conf["port"] = 6379 + + r, err := NewTemporalStorageHandler(conf, false) + if err != nil { + t.Fatal(err) + } + + if err := r.Init(); err != nil { + t.Fatal("unable to connect", err.Error()) + } + + if connectorSingleton == nil { + t.Fatal("connectorSingleton is nil") + } + + keyName := "testKey" + session := "testSession" + timeout := int64(60) + + err = r.SetKey(keyName, session, timeout) + if err != nil { + t.Fatal(err) + } + + // Verify that the key was set correctly + res, err := r.kv.Get(ctx, r.fixKey(keyName)) + if err != nil { + t.Fatal(err) + } + + if res != session { + t.Fatalf("Expected value %s, got %s", session, res) + } +} + +func TestTemporalStorageHandler_GetName(t *testing.T) { + conf := make(map[string]interface{}) + conf["host"] = "localhost" + conf["port"] = 6379 + + r, err := NewTemporalStorageHandler(conf, false) + if err != nil { + t.Fatal(err) + } + + if err := r.Init(); err != nil { + t.Fatal("unable to connect", err.Error()) + } + + if connectorSingleton == nil { + t.Fatal("connectorSingleton is nil") + } + + expected := "redis" + result := r.GetName() + + if result != expected { + t.Fatalf("Expected %s, but got %s", expected, result) + } +} + +func TestTemporalStorageHandler_Init(t *testing.T) { + testCases := []struct { + conf map[string]interface{} + errExpected error + name string + forceReconnect bool + }{ + { + name: "Valid configuration", + conf: map[string]interface{}{ + "host": "localhost", + "port": 6379, + }, + }, + { + name: "Invalid configuration", + conf: map[string]interface{}{ + "host": "abc", + "port": 6379, + "type": "invalid", + }, + forceReconnect: true, + errExpected: errors.New("unsupported database type: invalid"), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + r, err := NewTemporalStorageHandler(tc.conf, tc.forceReconnect) + if err != nil { + t.Fatal(err) + } + + err = r.Init() + if err != nil { + assert.Error(t, err, tc.errExpected) + return + } + + assert.NotNil(t, r.Config, "Expected Config not to be nil") + assert.NotNil(t, connectorSingleton, "Expected connectorSingleton not to be nil") + assert.NotNil(t, r.kv, "Expected kv not to be nil") + assert.NotNil(t, r.list, "Expected list not to be nil") + assert.Equal(t, model.RedisV9Type, connectorSingleton.Type(), "Expected connection type to be RedisV9Type") + assert.NoError(t, connectorSingleton.Ping(context.Background()), "Expected no error on ping") + }) + } +}