Skip to content

Commit

Permalink
Merge pull request #613 from cgalibern/dev
Browse files Browse the repository at this point in the history
misc
  • Loading branch information
cgalibern authored Oct 30, 2024
2 parents 59559b0 + a9e4ddd commit 2fd88b8
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 32 deletions.
17 changes: 11 additions & 6 deletions core/output/renderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (
"github.com/andreazorzetto/yh/highlight"
"github.com/fatih/color"
tabwriter "github.com/juju/ansiterm"
"k8s.io/client-go/util/jsonpath"
"sigs.k8s.io/yaml"

"github.com/opensvc/om3/util/render"
"github.com/opensvc/om3/util/render/palette"
"github.com/opensvc/om3/util/unstructured"
"k8s.io/client-go/util/jsonpath"
"sigs.k8s.io/yaml"
)

type (
Expand Down Expand Up @@ -241,15 +242,19 @@ func (t Renderer) renderTab(options string) (string, error) {
return "", err
}
for _, line := range unstructuredData {
for _, jsonPath := range jsonPaths {
for i, jsonPath := range jsonPaths {
var sep string
if i > 0 {
sep = "\t"
}
values, err := jsonPath.FindResults(line)
if err != nil {
fmt.Fprintf(w, "<%s>\t", err)
fmt.Fprintf(w, "%s<%s>", sep, err)
continue
}
valueStrings := []string{}
if len(values) == 0 || len(values[0]) == 0 {
fmt.Fprintf(w, "<none>\t")
fmt.Fprintf(w, "%s<none>", sep)
continue
}
for arrIx := range values {
Expand All @@ -263,7 +268,7 @@ func (t Renderer) renderTab(options string) (string, error) {
}
}
value := strings.Join(valueStrings, ",")
fmt.Fprintf(w, value+"\t")
fmt.Fprintf(w, "%s%s", sep, value)
}
fmt.Fprintf(w, "\n")
}
Expand Down
8 changes: 5 additions & 3 deletions daemon/discover/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,9 +538,11 @@ func (t *Manager) onRemoteConfigFetched(c *msgbus.RemoteFileConfig) {
c.Err <- nil
default:
confFile := c.Path.ConfigFile()
if err := freezeIfOrchestrateHA(confFile); err != nil {
c.Err <- err
return
if instance.ConfigData.Get(c.Path, t.localhost) == nil {
if err := freezeIfOrchestrateHA(confFile); err != nil {
c.Err <- err
return
}
}
if err := os.Rename(c.File, confFile); err != nil {
log.Errorf("cfg: can't install %s config fetched from node %s to %s: %s", c.Path, c.Node, confFile, err)
Expand Down
33 changes: 30 additions & 3 deletions daemon/hb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,23 @@ func (t *T) startHbTx(hb hbcfg.Confer) error {
return fmt.Errorf("nil tx for %s", hb.Name())
}
t.ctrlC <- hbctrl.CmdRegister{ID: tx.ID(), Type: hb.Type()}
localDataC := make(chan []byte)
if err := tx.Start(t.ctrlC, localDataC); err != nil {

// start debounce msg goroutine to ensure non-blocking write to msgToSendQ:
// the msgToTxCtx goroutine multiplexes data messages to all hb tx drivers.
// It can't be stalled because of slow hb trasnmitter.
debouncedMsgQ := make(chan []byte)
msgToSendQ := make(chan []byte)
go debounceLatestMsgToTx(t.msgToTxCtx, msgToSendQ, debouncedMsgQ)

if err := tx.Start(t.ctrlC, debouncedMsgQ); err != nil {
t.log.Errorf("start %s failed: %s", tx.ID(), err)
t.ctrlC <- hbctrl.CmdSetState{ID: tx.ID(), State: "failed"}
return err
}
select {
case <-t.msgToTxCtx.Done():
// don't hang up when context is done
case t.msgToTxRegister <- registerTxQueue{id: tx.ID(), msgToSendQueue: localDataC}:
case t.msgToTxRegister <- registerTxQueue{id: tx.ID(), msgToSendQueue: msgToSendQ}:
t.txs[hb.Name()] = tx
}
return nil
Expand Down Expand Up @@ -597,3 +604,23 @@ func (t *T) getHbConfiguredComponent(ctx context.Context, rid string) (c hbcfg.C
err = fmt.Errorf("not found rid")
return
}

// debounceLatestMsgToTx is used to relay dequeued messages from inQ
// to outC, without blocking on outQ (lastest dequeued message from
// inQ will replace the relay bloqued message to outQ).
func debounceLatestMsgToTx(ctx context.Context, inQ <-chan []byte, outC chan<- []byte) {
var (
b []byte
o chan<- []byte
)
for {
select {
case b = <-inQ:
o = outC
case o <- b:
o = nil
case <-ctx.Done():
return
}
}
}
19 changes: 0 additions & 19 deletions daemon/imon/main_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,25 +718,6 @@ func (t *Manager) AllInstanceMonitors() map[string]instance.Monitor {
return m
}

func (t *Manager) isExtraInstance() (bool, string) {
if t.state.IsHALeader {
return false, "object is not leader"
}
if v, reason := t.isHAOrchestrateable(); !v {
return false, reason
}
if t.objStatus.Avail != status.Up {
return false, "object is not up"
}
if t.objStatus.Topology != topology.Flex {
return false, "object is not flex"
}
if t.objStatus.UpInstancesCount <= t.objStatus.FlexTarget {
return false, fmt.Sprintf("%d/%d up instances", t.objStatus.UpInstancesCount, t.objStatus.FlexTarget)
}
return true, ""
}

func (t *Manager) isHAOrchestrateable() (bool, string) {
if (t.objStatus.Topology == topology.Failover) && (t.objStatus.Avail == status.Warn) {
return false, "failover object is warn state"
Expand Down
12 changes: 11 additions & 1 deletion daemon/imon/orchestration_ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,19 @@ func (t *Manager) orchestrateHAStop() {
if t.objStatus.Topology != topology.Flex {
return
}
if v, _ := t.isExtraInstance(); !v {

if t.nodeStatus[t.localhost].IsFrozen() {
return
} else if t.objStatus.UpInstancesCount <= t.objStatus.FlexTarget {
return
} else if t.state.IsHALeader {
return
} else if v, _ := t.isHAOrchestrateable(); !v {
return
} else if t.objStatus.Avail != status.Up {
return
}

t.stop()
}

Expand Down

0 comments on commit 2fd88b8

Please sign in to comment.