From 6f03335fde994d17911727edae297c35560ebe42 Mon Sep 17 00:00:00 2001 From: Cyril Galibern Date: Tue, 29 Oct 2024 11:58:15 +0100 Subject: [PATCH 1/4] [daemon/hb] Prevents MsgToTx staled on slow hb transmitter MsgToTx can be slowed because of slow hb transmitters: => The messages takes more time to reach peer nodes Each hb transmitter starts a goroutine debounceLatestMsgToTx: It relays messages that must be sent do Tx to the hb tx message Q. Under pressure, only the latest message is sent (pending message are droppped) --- daemon/hb/main.go | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/daemon/hb/main.go b/daemon/hb/main.go index 25fb35c54..7e815b442 100644 --- a/daemon/hb/main.go +++ b/daemon/hb/main.go @@ -183,8 +183,15 @@ func (t *T) startHbTx(hb hbcfg.Confer) error { return fmt.Errorf("nil tx for %s", hb.Name()) } t.ctrlC <- hbctrl.CmdRegister{ID: tx.ID(), Type: hb.Type()} - localDataC := make(chan []byte) - if err := tx.Start(t.ctrlC, localDataC); err != nil { + + // start debounce msg goroutine to ensure non-blocking write to msgToSendQ: + // the msgToTxCtx goroutine multiplexes data messages to all hb tx drivers. + // It can't be stalled because of slow hb trasnmitter. + debouncedMsgQ := make(chan []byte) + msgToSendQ := make(chan []byte) + go debounceLatestMsgToTx(t.msgToTxCtx, msgToSendQ, debouncedMsgQ) + + if err := tx.Start(t.ctrlC, debouncedMsgQ); err != nil { t.log.Errorf("start %s failed: %s", tx.ID(), err) t.ctrlC <- hbctrl.CmdSetState{ID: tx.ID(), State: "failed"} return err @@ -192,7 +199,7 @@ func (t *T) startHbTx(hb hbcfg.Confer) error { select { case <-t.msgToTxCtx.Done(): // don't hang up when context is done - case t.msgToTxRegister <- registerTxQueue{id: tx.ID(), msgToSendQueue: localDataC}: + case t.msgToTxRegister <- registerTxQueue{id: tx.ID(), msgToSendQueue: msgToSendQ}: t.txs[hb.Name()] = tx } return nil @@ -597,3 +604,23 @@ func (t *T) getHbConfiguredComponent(ctx context.Context, rid string) (c hbcfg.C err = fmt.Errorf("not found rid") return } + +// debounceLatestMsgToTx is used to relay dequeued messages from inQ +// to outC, without blocking on outQ (lastest dequeued message from +// inQ will replace the relay bloqued message to outQ). +func debounceLatestMsgToTx(ctx context.Context, inQ <-chan []byte, outC chan<- []byte) { + var ( + b []byte + o chan<- []byte + ) + for { + select { + case b = <-inQ: + o = outC + case o <- b: + o = nil + case <-ctx.Done(): + return + } + } +} From 8cf1784cb48fc9be7a770afa6e64dda8dd1acb12 Mon Sep 17 00:00:00 2001 From: Cyril Galibern Date: Tue, 29 Oct 2024 16:34:01 +0100 Subject: [PATCH 2/4] [daemon/imon] fix unexpected stop extra flex instance on frozen node reproducer: om flx deploy --kw nodes=\* --kw orchestrate=ha --kw topology=flex --kw fs#1.type=flag om flx set --kw flex_target=3 om cluster thaw --wait om flx thaw --wait om cluster freeze --wait om flx set --kw flex_target=1 --- daemon/imon/main_cmd.go | 19 ------------------- daemon/imon/orchestration_ha.go | 12 +++++++++++- 2 files changed, 11 insertions(+), 20 deletions(-) diff --git a/daemon/imon/main_cmd.go b/daemon/imon/main_cmd.go index a6ed56c5a..16a637990 100644 --- a/daemon/imon/main_cmd.go +++ b/daemon/imon/main_cmd.go @@ -718,25 +718,6 @@ func (t *Manager) AllInstanceMonitors() map[string]instance.Monitor { return m } -func (t *Manager) isExtraInstance() (bool, string) { - if t.state.IsHALeader { - return false, "object is not leader" - } - if v, reason := t.isHAOrchestrateable(); !v { - return false, reason - } - if t.objStatus.Avail != status.Up { - return false, "object is not up" - } - if t.objStatus.Topology != topology.Flex { - return false, "object is not flex" - } - if t.objStatus.UpInstancesCount <= t.objStatus.FlexTarget { - return false, fmt.Sprintf("%d/%d up instances", t.objStatus.UpInstancesCount, t.objStatus.FlexTarget) - } - return true, "" -} - func (t *Manager) isHAOrchestrateable() (bool, string) { if (t.objStatus.Topology == topology.Failover) && (t.objStatus.Avail == status.Warn) { return false, "failover object is warn state" diff --git a/daemon/imon/orchestration_ha.go b/daemon/imon/orchestration_ha.go index 39259fb99..d1f25d46c 100644 --- a/daemon/imon/orchestration_ha.go +++ b/daemon/imon/orchestration_ha.go @@ -19,9 +19,19 @@ func (t *Manager) orchestrateHAStop() { if t.objStatus.Topology != topology.Flex { return } - if v, _ := t.isExtraInstance(); !v { + + if t.nodeStatus[t.localhost].IsFrozen() { + return + } else if t.objStatus.UpInstancesCount <= t.objStatus.FlexTarget { + return + } else if t.state.IsHALeader { + return + } else if v, _ := t.isHAOrchestrateable(); !v { + return + } else if t.objStatus.Avail != status.Up { return } + t.stop() } From 7e052ca22e113e2f5868867fd68ffc7b91e49779 Mon Sep 17 00:00:00 2001 From: Cyril Galibern Date: Tue, 29 Oct 2024 16:35:40 +0100 Subject: [PATCH 3/4] [daemon/discover] Skip freeze existing ha instance on fetch config --- daemon/discover/cfg.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/daemon/discover/cfg.go b/daemon/discover/cfg.go index bd0da135b..b1fe22081 100644 --- a/daemon/discover/cfg.go +++ b/daemon/discover/cfg.go @@ -538,9 +538,11 @@ func (t *Manager) onRemoteConfigFetched(c *msgbus.RemoteFileConfig) { c.Err <- nil default: confFile := c.Path.ConfigFile() - if err := freezeIfOrchestrateHA(confFile); err != nil { - c.Err <- err - return + if instance.ConfigData.Get(c.Path, t.localhost) == nil { + if err := freezeIfOrchestrateHA(confFile); err != nil { + c.Err <- err + return + } } if err := os.Rename(c.File, confFile); err != nil { log.Errorf("cfg: can't install %s config fetched from node %s to %s: %s", c.Path, c.Node, confFile, err) From a9e4ddd4b065382b7567636ecd228ea1160b3767 Mon Sep 17 00:00:00 2001 From: Cyril Galibern Date: Tue, 29 Oct 2024 17:18:45 +0100 Subject: [PATCH 4/4] [core/output] Don't separator on last field of a tab line --- core/output/renderer.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/core/output/renderer.go b/core/output/renderer.go index 5e16f3b13..799ff35d4 100644 --- a/core/output/renderer.go +++ b/core/output/renderer.go @@ -12,11 +12,12 @@ import ( "github.com/andreazorzetto/yh/highlight" "github.com/fatih/color" tabwriter "github.com/juju/ansiterm" + "k8s.io/client-go/util/jsonpath" + "sigs.k8s.io/yaml" + "github.com/opensvc/om3/util/render" "github.com/opensvc/om3/util/render/palette" "github.com/opensvc/om3/util/unstructured" - "k8s.io/client-go/util/jsonpath" - "sigs.k8s.io/yaml" ) type ( @@ -241,15 +242,19 @@ func (t Renderer) renderTab(options string) (string, error) { return "", err } for _, line := range unstructuredData { - for _, jsonPath := range jsonPaths { + for i, jsonPath := range jsonPaths { + var sep string + if i > 0 { + sep = "\t" + } values, err := jsonPath.FindResults(line) if err != nil { - fmt.Fprintf(w, "<%s>\t", err) + fmt.Fprintf(w, "%s<%s>", sep, err) continue } valueStrings := []string{} if len(values) == 0 || len(values[0]) == 0 { - fmt.Fprintf(w, "\t") + fmt.Fprintf(w, "%s", sep) continue } for arrIx := range values { @@ -263,7 +268,7 @@ func (t Renderer) renderTab(options string) (string, error) { } } value := strings.Join(valueStrings, ",") - fmt.Fprintf(w, value+"\t") + fmt.Fprintf(w, "%s%s", sep, value) } fmt.Fprintf(w, "\n") }