Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

return redis list objects too, converted into json list #749

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 66 additions & 36 deletions backends/redis/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package redis

import (
"encoding/json"
"fmt"
"os"
"strconv"
Expand All @@ -11,7 +12,6 @@ import (
"github.com/kelseyhightower/confd/log"
)


type watchResponse struct {
waitIndex uint64
err error
Expand All @@ -23,7 +23,7 @@ type Client struct {
machines []string
password string
separator string
psc redis.PubSubConn
psc redis.PubSubConn
pscChan chan watchResponse
}

Expand Down Expand Up @@ -116,25 +116,25 @@ func NewRedisClient(machines []string, password string, separator string) (*Clie
}
log.Debug(fmt.Sprintf("Redis Separator: %#v", separator))
var err error
clientWrapper := &Client{machines: machines, password: password, separator: separator, client: nil, pscChan: make(chan watchResponse), psc: redis.PubSubConn{Conn: nil} }
clientWrapper := &Client{machines: machines, password: password, separator: separator, client: nil, pscChan: make(chan watchResponse), psc: redis.PubSubConn{Conn: nil}}
clientWrapper.client, _, err = tryConnect(machines, password, true)
return clientWrapper, err
}

func (c *Client) transform(key string) string {
if c.separator == "/" {
return key;
return key
}
k := strings.TrimPrefix(key, "/")
return strings.Replace(k, "/", c.separator, -1);
return strings.Replace(k, "/", c.separator, -1)
}

func (c *Client) clean(key string) string {
k := key
if !strings.HasPrefix(k, "/") {
k = "/" + k
}
return strings.Replace(k, c.separator, "/", -1);
return strings.Replace(k, c.separator, "/", -1)
}

// GetValues queries redis for keys prefixed by prefix.
Expand Down Expand Up @@ -163,6 +163,20 @@ func (c *Client) GetValues(keys []string) (map[string]string, error) {
if err != redis.ErrNil {
return vars, err
}
} else if t == "list" {
values, err := redis.Strings(rClient.Do("LRANGE", k, 0, -1))
if err == nil {
valuesJson, err := redis.String(json.Marshal(values))
if err == nil {
vars[key] = valuesJson
continue
} else {
return vars, err
}
}
if err != redis.ErrNil {
return vars, err
}
} else if t == "hash" {
idx := 0
for {
Expand All @@ -172,15 +186,15 @@ func (c *Client) GetValues(keys []string) (map[string]string, error) {
}
idx, _ = redis.Int(values[0], nil)
items, _ := redis.Strings(values[1], nil)
for i := 0; i < len(items); i+=2 {
for i := 0; i < len(items); i += 2 {
var newKey, value string
if newKey, err = redis.String(items[i], nil); err != nil {
return vars, err
}
if value, err = redis.String(items[i+1], nil); err != nil {
return vars, err
}
vars[c.clean(k + "/" + newKey)] = value
vars[c.clean(k+"/"+newKey)] = value
}
if idx == 0 {
break
Expand All @@ -192,7 +206,7 @@ func (c *Client) GetValues(keys []string) (map[string]string, error) {
} else {
k = fmt.Sprintf(c.transform("%s/*"), k)
}

idx := 0
for {
values, err := redis.Values(rClient.Do("SCAN", idx, "MATCH", k, "COUNT", "1000"))
Expand All @@ -206,8 +220,24 @@ func (c *Client) GetValues(keys []string) (map[string]string, error) {
if newKey, err = redis.String(item, nil); err != nil {
return vars, err
}
if value, err := redis.String(rClient.Do("GET", newKey)); err == nil {
value, err := redis.String(rClient.Do("GET", newKey))
if err == nil {
vars[c.clean(newKey)] = value
} else {
// TODO: how to check if err is redis WrongType? (list, set, hash,...)
// just try if its a list...
values, err := redis.Strings(rClient.Do("LRANGE", newKey, 0, -1))
if err == nil {
valuesJson, err := redis.String(json.Marshal(values))
if err == nil {
vars[key] = valuesJson
continue
} else {
return vars, err
}
} else {
return vars, err
}
}
}
if idx == 0 {
Expand All @@ -226,7 +256,7 @@ func (c *Client) GetValues(keys []string) (map[string]string, error) {
}

func (c *Client) WatchPrefix(prefix string, keys []string, waitIndex uint64, stopChan chan bool) (uint64, error) {

if waitIndex == 0 {
return 1, nil
}
Expand All @@ -241,15 +271,15 @@ func (c *Client) WatchPrefix(prefix string, keys []string, waitIndex uint64, sto

go func() {
if c.psc.Conn == nil {
rClient, db, err := tryConnect(c.machines, c.password, false);
rClient, db, err := tryConnect(c.machines, c.password, false)

if err != nil {
c.psc = redis.PubSubConn{Conn: nil}
c.pscChan <- watchResponse{0, err}
return
}
c.psc = redis.PubSubConn{Conn: rClient}

c.psc = redis.PubSubConn{Conn: rClient}

go func() {
defer func() {
Expand All @@ -258,30 +288,30 @@ func (c *Client) WatchPrefix(prefix string, keys []string, waitIndex uint64, sto
}()
for {
switch n := c.psc.Receive().(type) {
case redis.PMessage:
log.Debug(fmt.Sprintf("Redis Message: %s %s\n", n.Channel, n.Data))
data := string(n.Data)
commands := [12]string{"del", "append", "rename_from", "rename_to", "expire", "set", "incrby", "incrbyfloat", "hset", "hincrby", "hincrbyfloat", "hdel"}
for _, command := range commands {
if command == data {
c.pscChan <- watchResponse{1, nil}
break
}
case redis.PMessage:
log.Debug(fmt.Sprintf("Redis Message: %s %s\n", n.Channel, n.Data))
data := string(n.Data)
commands := [12]string{"del", "append", "rename_from", "rename_to", "expire", "set", "incrby", "incrbyfloat", "hset", "hincrby", "hincrbyfloat", "hdel"}
for _, command := range commands {
if command == data {
c.pscChan <- watchResponse{1, nil}
break
}
case redis.Subscription:
log.Debug(fmt.Sprintf("Redis Subscription: %s %s %d\n", n.Kind, n.Channel, n.Count))
if n.Count == 0 {
c.pscChan <- watchResponse{0, nil}
return
}
case error:
log.Debug(fmt.Sprintf("Redis error: %v\n", n))
c.pscChan <- watchResponse{0, n}
}
case redis.Subscription:
log.Debug(fmt.Sprintf("Redis Subscription: %s %s %d\n", n.Kind, n.Channel, n.Count))
if n.Count == 0 {
c.pscChan <- watchResponse{0, nil}
return
}
case error:
log.Debug(fmt.Sprintf("Redis error: %v\n", n))
c.pscChan <- watchResponse{0, n}
return
}
}
}()

c.psc.PSubscribe("__keyspace@" + strconv.Itoa(db) + "__:" + c.transform(prefix) + "*")
}
}()
Expand All @@ -290,7 +320,7 @@ func (c *Client) WatchPrefix(prefix string, keys []string, waitIndex uint64, sto
case <-stopChan:
c.psc.PUnsubscribe()
return waitIndex, nil
case r := <- c.pscChan:
case r := <-c.pscChan:
return r.waitIndex, r.err
}
}
}