Skip to content

Commit

Permalink
[*] refactor ConsulLeaderChecker
Browse files Browse the repository at this point in the history
  • Loading branch information
pashagolub committed Nov 14, 2024
1 parent d890923 commit 8a094cd
Showing 1 changed file with 18 additions and 31 deletions.
49 changes: 18 additions & 31 deletions checker/consul_leader_checker.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package checker

import (
"cmp"
"context"
"fmt"
"net/url"
"time"

Expand All @@ -11,83 +13,68 @@ import (

// ConsulLeaderChecker is used to check state of the leader key in Consul
type ConsulLeaderChecker struct {
key string
nodename string
apiClient *api.Client
*vipconfig.Config
*api.Client
}

// naming this cConf to avoid conflict with conf in etcd_leader_checker.go
var cConf *vipconfig.Config

// NewConsulLeaderChecker returns a new instance
func NewConsulLeaderChecker(con *vipconfig.Config) (*ConsulLeaderChecker, error) {
cConf = con
lc := &ConsulLeaderChecker{
key: cConf.TriggerKey,
nodename: cConf.TriggerValue,
}
func NewConsulLeaderChecker(con *vipconfig.Config) (lc *ConsulLeaderChecker, err error) {
lc = &ConsulLeaderChecker{Config: con}

url, err := url.Parse(cConf.Endpoints[0])
url, err := url.Parse(con.Endpoints[0])
if err != nil {
return nil, err
}
address := url.Hostname() + ":" + url.Port()

config := &api.Config{
Address: address,
Address: fmt.Sprintf("%s:%s", url.Hostname(), url.Port()),
Scheme: url.Scheme,
WaitTime: time.Second,
Token: cmp.Or(con.ConsulToken, ""),
}

if cConf.ConsulToken != "" {
config.Token = cConf.ConsulToken
}

apiClient, err := api.NewClient(config)
if err != nil {
if lc.Client, err = api.NewClient(config); err != nil {
return nil, err
}

lc.apiClient = apiClient

return lc, nil
}

// GetChangeNotificationStream checks the status in the loop
func (c *ConsulLeaderChecker) GetChangeNotificationStream(ctx context.Context, out chan<- bool) error {
kv := c.apiClient.KV()
kv := c.Client.KV()

queryOptions := &api.QueryOptions{
RequireConsistent: true,
}

checkLoop:
for {
resp, _, err := kv.Get(c.key, queryOptions)
resp, _, err := kv.Get(c.TriggerKey, queryOptions)
if err != nil {
if ctx.Err() != nil {
break checkLoop
}
cConf.Logger.Sugar().Error("consul error: ", err)
c.Logger.Sugar().Error("consul error: ", err)
out <- false
time.Sleep(time.Duration(cConf.Interval) * time.Millisecond)
time.Sleep(time.Duration(c.Interval) * time.Millisecond)
continue
}
if resp == nil {
cConf.Logger.Sugar().Errorf("Cannot get variable for key %s. Will try again in a second.", c.key)
c.Logger.Sugar().Errorf("Cannot get variable for key %s. Will try again in a second.", c.TriggerKey)
out <- false
time.Sleep(time.Duration(cConf.Interval) * time.Millisecond)
time.Sleep(time.Duration(c.Interval) * time.Millisecond)
continue
}

state := string(resp.Value) == c.nodename
state := string(resp.Value) == c.TriggerValue
queryOptions.WaitIndex = resp.ModifyIndex

select {
case <-ctx.Done():
break checkLoop
case out <- state:
time.Sleep(time.Duration(cConf.Interval) * time.Millisecond)
time.Sleep(time.Duration(c.Interval) * time.Millisecond)
continue
}
}
Expand Down

0 comments on commit 8a094cd

Please sign in to comment.