Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamic-lsnr-port #418

Merged
merged 23 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
1c2e47a
[listener] Startup ux listener first
cgalibern Sep 28, 2023
384128f
[core/node] Defines Lsnr and LsnrData to track nodes inet listener ad…
cgalibern Sep 28, 2023
793af2a
[lsnrhttpinet] Publish Listener{Updated|Deleted} & updates node.LsnrD…
cgalibern Sep 28, 2023
07f653d
[nmon] Updates nodes_info.json with localhost node.Lsnr
cgalibern Sep 28, 2023
bb26040
[nmon] Updates nodes_info.json with remotes node.Lsnr
cgalibern Sep 28, 2023
0946b24
[lsnrhttpinet] Restart listener when addr is changed
cgalibern Sep 29, 2023
6275e36
[msgbus] Fix missing json tag on NodeStatusUpdated.Node
cgalibern Sep 29, 2023
f11cceb
[discover] Detect peer lsnr port to fetch remote config files
cgalibern Sep 29, 2023
775503e
Listener keywords port and addr are now scope able
cgalibern Sep 29, 2023
8994382
[actionrouter] Fix human render for unexpected type
cgalibern Sep 29, 2023
7e64ae0
[core/node] Status has now Lsnr member
cgalibern Sep 29, 2023
6aeae64
[daemondata] Handle ListenerUpdated event propagation
cgalibern Sep 29, 2023
b33ab5c
[nmon] track peer lsnr changes to nodes_info.json and .cluster.node.<…
cgalibern Sep 29, 2023
315b733
[ccfg] log eval listener keyword errors
cgalibern Sep 30, 2023
dc0b982
Drop ListenerDeleted event
cgalibern Sep 30, 2023
2b93ba6
[daemaondata] Set NodeStatus.Lsnr.UpdatedAt value during init
cgalibern Sep 30, 2023
2efbf87
[core/client] Detect port from nodes_info.json
cgalibern Sep 30, 2023
ac15ff1
[cluster] Render listener port during 'om mon'
cgalibern Sep 30, 2023
6b0977b
[daemondata] LsnrData unset peer during dropPeer
cgalibern Sep 30, 2023
339509b
[test] Fix monitor render since port added to listener
cgalibern Sep 30, 2023
8ab64c2
[command] om daemon leave and join support custom listener port
cgalibern Sep 30, 2023
0da522c
[daemondata] Fix possible unpublished peer messages during apply full
cgalibern Oct 2, 2023
0387061
[nmon] nodes_info.json may not exist after daemon startup (fix)
cgalibern Oct 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ func newCmdDaemonJoin() *cobra.Command {
}
flags := cmd.Flags()
flags.StringVar(&options.Node, "node", "", "the name of the cluster node we want to join")
flags.StringVar(&options.Server, "server", "", "URI of the opensvc api server when custom port is used.")

if err := cmd.MarkFlagRequired("node"); err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion core/actionrouter/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,6 @@ func DefaultHumanRenderer(data interface{}) string {
case []byte:
return string(v)
default:
return ""
return fmt.Sprintln(v)
}
}
11 changes: 10 additions & 1 deletion core/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
reqh2 "github.com/opensvc/om3/core/client/requester/h2"
"github.com/opensvc/om3/core/clientcontext"
"github.com/opensvc/om3/core/env"
"github.com/opensvc/om3/core/nodesinfo"
"github.com/opensvc/om3/core/rawconfig"
oapi "github.com/opensvc/om3/daemon/api"
"github.com/opensvc/om3/daemon/daemonenv"
Expand Down Expand Up @@ -208,7 +209,15 @@ func (t *T) newRequester() (err error) {
})
default:
if !strings.Contains(t.url, ":") {
t.url += ":" + fmt.Sprint(daemonenv.HttpPort)
if nodesInfo, err := nodesinfo.Load(); err != nil {
t.url += ":" + fmt.Sprint(daemonenv.HttpPort)
} else {
if nodeInfo, ok := nodesInfo[t.url]; ok && nodeInfo.Lsnr.Port != "" {
t.url += ":" + nodeInfo.Lsnr.Port
} else {
t.url += ":" + fmt.Sprint(daemonenv.HttpPort)
}
}
}
t.url = reqh2.InetPrefix + t.url
t.ClientWithResponses, err = reqh2.NewInet(reqh2.Config{
Expand Down
11 changes: 6 additions & 5 deletions core/cluster/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ var (
type (
// Frame exposes daemon status renderer tunables.
Frame struct {
Nodes []string
Sections []string
Current Data
Previous Data
Stats Stats
Localhost string
Nodes []string
Sections []string
Current Data
Previous Data
Stats Stats

// private
w *tabwriter.TabWriter
Expand Down
17 changes: 12 additions & 5 deletions core/cluster/frame_threads.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cluster
import (
"fmt"

"github.com/opensvc/om3/util/hostname"
"github.com/opensvc/om3/util/render/listener"
)

Expand Down Expand Up @@ -40,14 +39,22 @@ func (f Frame) wThreadCollector() string {
func (f Frame) wThreadListener() string {
var s string
s += bold(" listener") + "\t"
if f.Current.Daemon.Listener.State == "running" {
if f.Current.Cluster.Node[f.Localhost].Status.Lsnr.Port != "" {
s += green("running") + "\t"
} else {
s += "\t"
}
s += fmt.Sprintf("%s\t", listener.Render(f.Current.Daemon.Listener.Config.Addr, f.Current.Daemon.Listener.Config.Port))
s += fmt.Sprintf("%s\t", listener.Render(f.Current.Daemon.Listener.Config.Addr, f.Current.Cluster.Config.Listener.Port))
s += f.info.separator + "\t"
s += f.info.emptyNodes
for _, node := range f.Current.Cluster.Config.Nodes {
port := f.Current.Cluster.Node[node].Status.Lsnr.Port
switch port {
case "":
s += iconDownIssue + "\t"
default:
s += port + "\t"
}
}
return s
}

Expand Down Expand Up @@ -112,7 +119,7 @@ func (f Frame) wThreadHeartbeats() string {
s += "\t" + hbStatus.Type + "\t"
s += f.info.separator + "\t"
for _, peer := range f.Current.Cluster.Config.Nodes {
if peer == hostname.Hostname() {
if peer == f.Localhost {
s += iconNotApplicable + "\t"
continue
}
Expand Down
13 changes: 10 additions & 3 deletions core/commands/daemon_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ type (
CmdDaemonJoin struct {
CmdDaemonCommon

Node string
Token string
Node string
Server string
Token string

// Timeout is the maximum duration for leave
Timeout time.Duration
Expand Down Expand Up @@ -65,8 +66,14 @@ func (t *CmdDaemonJoin) run() error {
_ = os.Remove(name)
}(certFile)

var url string
if t.Server == "" {
url = daemonenv.UrlHttpNode(t.Node)
} else {
url = t.Server
}
cli, err = client.New(
client.WithURL(daemonenv.UrlHttpNode(t.Node)),
client.WithURL(url),
client.WithRootCa(certFile),
client.WithBearer(t.Token),
)
Expand Down
3 changes: 1 addition & 2 deletions core/commands/daemon_leave.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/opensvc/om3/core/event"
"github.com/opensvc/om3/core/object"
"github.com/opensvc/om3/daemon/api"
"github.com/opensvc/om3/daemon/daemonenv"
"github.com/opensvc/om3/daemon/msgbus"
"github.com/opensvc/om3/util/hostname"
)
Expand All @@ -37,7 +36,7 @@ func (t *CmdDaemonLeave) Run() (err error) {
return err
}
t.cli, err = client.New(
client.WithURL(daemonenv.UrlHttpNode(t.ApiNode)),
client.WithURL(t.ApiNode),
)
if err != nil {
return
Expand Down
6 changes: 4 additions & 2 deletions core/monitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/opensvc/om3/core/output"
"github.com/opensvc/om3/core/rawconfig"
"github.com/opensvc/om3/daemon/msgbus"
"github.com/opensvc/om3/util/hostname"
)

type (
Expand Down Expand Up @@ -131,8 +132,9 @@ func (m *T) Do(getter Getter, out io.Writer) error {
func (m *T) doOneShot(data cluster.Data, clear bool, out io.Writer) {
human := func() string {
f := cluster.Frame{
Current: data,
Sections: m.sections,
Current: data,
Sections: m.sections,
Localhost: hostname.Hostname(),
}
return f.Render()
}
Expand Down
3 changes: 2 additions & 1 deletion core/monitor/testdata/single-node-daemon-status.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
"min_avail_mem": 0,
"min_avail_swap": 0,
"speaker": false,
"labels": {}
"labels": {},
"lsnr": {"port": "2222"}
},
"os": {
"paths": [
Expand Down
2 changes: 1 addition & 1 deletion core/monitor/testdata/single-node-om-mon.fixture
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Threads dev1n3
dns |
collector |
hb |
listener :0 |
listener :0 | 2222
monitor |
scheduler |

Expand Down
7 changes: 6 additions & 1 deletion core/node/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type (
Gen = map[string]uint64

Dataer interface {
Config | Monitor | san.Paths | Stats | Status | Gen
Config | Monitor | Lsnr | san.Paths | Stats | Status | Gen
}

DataElement[T Dataer] struct {
Expand All @@ -32,6 +32,9 @@ var (
// MonitorData is the package data holder for all nodes Monitors
MonitorData *Data[Monitor]

// LsnrData is the package data holder for all nodes Lsnr
LsnrData *Data[Lsnr]

// OsPathsData is the package data holder for all nodes Os paths data
OsPathsData *Data[san.Paths]

Expand Down Expand Up @@ -90,6 +93,7 @@ func (c *Data[T]) GetAll() []DataElement[T] {
func DropNode(nodename string) {
ConfigData.Unset(nodename)
MonitorData.Unset(nodename)
LsnrData.Unset(nodename)
OsPathsData.Unset(nodename)
StatusData.Unset(nodename)
StatsData.Unset(nodename)
Expand All @@ -100,6 +104,7 @@ func DropNode(nodename string) {
func InitData() {
ConfigData = NewData[Config]()
MonitorData = NewData[Monitor]()
LsnrData = NewData[Lsnr]()
OsPathsData = NewData[san.Paths]()
StatusData = NewData[Status]()
StatsData = NewData[Stats]()
Expand Down
18 changes: 18 additions & 0 deletions core/node/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type (
MinAvailSwapPct uint64 `json:"min_avail_swap"`
IsSpeaker bool `json:"is_speaker"`
Labels Labels `json:"labels"`
Lsnr Lsnr `json:"lsnr"`
}

// Instances groups instances configuration digest and status
Expand All @@ -36,6 +37,13 @@ type (
Status status.T `json:"status"`
}

// Lsnr describes the inet listener addr and port
Lsnr struct {
Addr string `json:"addr"`
Port string `json:"port"`
UpdatedAt time.Time `json:"updated_at"`
}

// NodesInfo is the dataset exposed via the GET /nodes_info handler,
// used by nodes to:
// * expand node selector expressions based on labels
Expand All @@ -46,9 +54,18 @@ type (
Env string `json:"env"`
Labels Labels `json:"labels"`
Paths san.Paths `json:"paths"`
Lsnr Lsnr `json:"lsnr"`
}
)

func (l *Lsnr) DeepCopy() *Lsnr {
return &Lsnr{
Addr: l.Addr,
Port: l.Port,
UpdatedAt: l.UpdatedAt,
}
}

func (t Status) IsFrozen() bool {
return !t.FrozenAt.IsZero()
}
Expand All @@ -71,6 +88,7 @@ func (t *Status) DeepCopy() *Status {
}
result.Gen = newGen
result.Labels = t.Labels.DeepCopy()
result.Lsnr = *t.Lsnr.DeepCopy()

return &result
}
Expand Down
14 changes: 8 additions & 6 deletions core/object/node_keywords.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,17 +445,19 @@ var nodeCommonKeywords = []keywords.Keyword{
Text: keywords.NewText(fs, "text/kw/node/listener.dns_sock_gid"),
},
{
Section: "listener",
Option: "addr",
Aliases: []string{"tls_addr"},
Default: "",
Example: "1.2.3.4",
Text: keywords.NewText(fs, "text/kw/node/listener.addr"),
Section: "listener",
Option: "addr",
Aliases: []string{"tls_addr"},
Scopable: true,
Default: "",
Example: "1.2.3.4",
Text: keywords.NewText(fs, "text/kw/node/listener.addr"),
},
{
Section: "listener",
Option: "port",
Aliases: []string{"tls_port"},
Scopable: true,
Converter: converters.Int,
Default: fmt.Sprintf("%d", daemonenv.HttpPort),
Text: keywords.NewText(fs, "text/kw/node/listener.port"),
Expand Down
12 changes: 10 additions & 2 deletions daemon/ccfg/main_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,16 @@ func (o *ccfg) getClusterConfig() cluster.Config {
cfg.Quorum = o.clusterConfig.GetBool(keyQuorum)

cfg.Listener.CRL = o.clusterConfig.GetString(keyListenerCRL)
cfg.Listener.Addr = o.clusterConfig.GetString(keyListenerAddr)
cfg.Listener.Port = o.clusterConfig.GetInt(keyListenerPort)
if v, err := o.clusterConfig.Eval(keyListenerAddr); err != nil {
o.log.Error().Err(err).Msg("eval listener port")
} else {
cfg.Listener.Addr = v.(string)
}
if v, err := o.clusterConfig.Eval(keyListenerPort); err != nil {
o.log.Error().Err(err).Msg("eval listener port")
} else {
cfg.Listener.Port = v.(int)
}
cfg.Listener.OpenIdWellKnown = o.clusterConfig.GetString(keyListenerOpenIdWellKnown)
cfg.Listener.DNSSockGID = o.clusterConfig.GetString(keyListenerDNSSockGID)
cfg.Listener.DNSSockUID = o.clusterConfig.GetString(keyListenerDNSSockUID)
Expand Down
8 changes: 8 additions & 0 deletions daemon/daemondata/apply_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (d *data) applyNodeData(msg *hbtype.Msg) error {
func (d *data) refreshPreviousUpdated(peer string) *remoteInfo {
if prev, ok := d.previousRemoteInfo[peer]; ok {
if prev.gen == d.clusterData.Cluster.Node[peer].Status.Gen[peer] {
d.log.Debug().Msgf("refreshPreviousUpdated skipped (already computed gen %d)", prev.gen)
return nil
}
}
Expand Down Expand Up @@ -184,12 +185,19 @@ func (d *data) pubMsgFromNodeStatusDiffForNode(peer string) {
d.bus.Pub(&msgbus.NodeStatusLabelsUpdated{Node: peer, Value: next.Labels.DeepCopy()}, labels...)
changed = true
}
if next.Lsnr.UpdatedAt.After(prev.Lsnr.UpdatedAt) {
node.LsnrData.Set(peer, next.Lsnr.DeepCopy())
d.bus.Pub(&msgbus.ListenerUpdated{Node: peer, Lsnr: *next.Lsnr.DeepCopy()}, labels...)
changed = true
}
if changed || !reflect.DeepEqual(prev, next) {
node.StatusData.Set(peer, next.DeepCopy())
d.bus.Pub(&msgbus.NodeStatusUpdated{Node: peer, Value: *next.DeepCopy()}, labels...)
}
}
onCreate := func() {
node.LsnrData.Set(peer, next.Lsnr.DeepCopy())
d.bus.Pub(&msgbus.ListenerUpdated{Node: peer, Lsnr: *next.Lsnr.DeepCopy()}, labels...)
d.bus.Pub(&msgbus.NodeStatusLabelsUpdated{Node: peer, Value: next.Labels.DeepCopy()}, labels...)
node.StatusData.Set(peer, next.DeepCopy())
d.bus.Pub(&msgbus.NodeStatusUpdated{Node: peer, Value: *next.DeepCopy()}, labels...)
Expand Down
3 changes: 3 additions & 0 deletions daemon/daemondata/apply_patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ func (d *data) setCacheAndPublish(ev event.Event) error {
case *msgbus.InstanceStatusUpdated:
instance.StatusData.Set(c.Path, c.Node, &c.Value)
d.bus.Pub(c, labelFromPeer)
case *msgbus.ListenerUpdated:
node.LsnrData.Set(c.Node, &c.Lsnr)
d.bus.Pub(c, labelFromPeer)
case *msgbus.NodeConfigUpdated:
node.ConfigData.Set(c.Node, &c.Value)
d.bus.Pub(c, labelFromPeer)
Expand Down
10 changes: 8 additions & 2 deletions daemon/daemondata/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type (

data struct {
// previousRemoteInfo map[node] of remoteInfo from clusterData data just
// after commit, it is used to publish diff for other nodes
// after full message applied (used to publish detected diff on full message applied).
previousRemoteInfo map[string]remoteInfo

// clusterData is the live current data (after apply msg from patch or subscription)
Expand Down Expand Up @@ -90,7 +90,7 @@ type (
gens map[string]uint64
eventQueue map[string][]event.Event

// remoteInfo struct holds information about remote node used to publish diff
// remoteInfo struct holds information about remote node used to publish diff on full message received
remoteInfo struct {
nmonUpdated time.Time
nodeStats node.Stats
Expand Down Expand Up @@ -391,6 +391,8 @@ func (d *data) startSubscriptions() {
sub.AddFilter(&msgbus.InstanceStatusUpdated{}, d.labelLocalNode)
sub.AddFilter(&msgbus.InstanceStatusDeleted{}, d.labelLocalNode)

sub.AddFilter(&msgbus.ListenerUpdated{}, d.labelLocalNode)

sub.AddFilter(&msgbus.NodeConfigUpdated{}, d.labelLocalNode)

sub.AddFilter(&msgbus.NodeMonitorDeleted{}, d.labelLocalNode)
Expand Down Expand Up @@ -460,6 +462,10 @@ func (d *data) onSubEvent(i interface{}) error {
if c.Node == d.localNode {
d.appendEv(c)
}
case *msgbus.ListenerUpdated:
if c.Node == d.localNode {
d.appendEv(c)
}
case *msgbus.NodeConfigUpdated:
if c.Node == d.localNode {
d.appendEv(c)
Expand Down
Loading