Skip to content

Commit

Permalink
refactor(linux): rework networkConnectionSensor for channels pt 3
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuar committed Dec 11, 2023
1 parent 80ac6e3 commit 44f5375
Showing 1 changed file with 66 additions and 35 deletions.
101 changes: 66 additions & 35 deletions internal/linux/networkConnectionSensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"slices"

Check failure on line 10 in internal/linux/networkConnectionSensor.go

View workflow job for this annotation

GitHub Actions / build

could not import slices (package slices is not in GOROOT (/opt/hostedtoolcache/go/1.20.11/x64/src/slices))
"strings"

"github.com/davecgh/go-spew/spew"
"github.com/godbus/dbus/v5"
"github.com/iancoleman/strcase"
"github.com/joshuar/go-hass-agent/internal/tracker"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (c *connection) State() interface{} {
func (c *connection) monitorConnectionState(ctx context.Context) chan tracker.Sensor {
log.Debug().Str("connection", c.Name()).Str("path", string(c.path)).
Msg("Monitoring connection state.")
sensorCh := make(chan tracker.Sensor)
sensorCh := make(chan tracker.Sensor, 1)
err := dbushelpers.NewBusRequest(ctx, dbushelpers.SystemBus).
Match([]dbus.MatchOption{
dbus.WithMatchObjectPath(dbusNMActiveConnPath),
Expand All @@ -99,8 +100,11 @@ func (c *connection) monitorConnectionState(ctx context.Context) chan tracker.Se
if ok {
state, ok := props["State"]
if ok {
c.state = dbushelpers.VariantToValue[connState](state)
sensorCh <- c
currentState := dbushelpers.VariantToValue[connState](state)
if c.state != currentState {
c.state = currentState
sensorCh <- c
}
if dbushelpers.VariantToValue[uint32](state) == 4 {
log.Debug().Str("connection", c.Name()).Str("path", string(c.path)).
Msg("Unmonitoring connection state.")
Expand All @@ -121,7 +125,7 @@ func (c *connection) monitorConnectionState(ctx context.Context) chan tracker.Se
func (c *connection) monitorAddresses(ctx context.Context) chan tracker.Sensor {
log.Debug().Str("connection", c.Name()).Str("path", string(c.path)).
Msg("Monitoring address changes.")
sensorCh := make(chan tracker.Sensor)
sensorCh := make(chan tracker.Sensor, 1)
go func() {
r := dbushelpers.NewBusRequest(ctx, dbushelpers.SystemBus).
Path(c.path).
Expand Down Expand Up @@ -155,13 +159,21 @@ func (c *connection) monitorAddresses(ctx context.Context) chan tracker.Sensor {
for k, v := range props {
switch k {
case "Ip4Config":
c.attrs.Ipv4, c.attrs.IPv4Mask = getAddr(ctx, 4, dbushelpers.VariantToValue[dbus.ObjectPath](v))
addr, mask := getAddr(ctx, 4, dbushelpers.VariantToValue[dbus.ObjectPath](v))
if addr != c.attrs.Ipv4 {
c.attrs.Ipv4 = addr
c.attrs.IPv4Mask = mask
sensorCh <- c
}
case "Ip6Config":
c.attrs.Ipv6, c.attrs.IPv6Mask = getAddr(ctx, 6, dbushelpers.VariantToValue[dbus.ObjectPath](v))
addr, mask := getAddr(ctx, 6, dbushelpers.VariantToValue[dbus.ObjectPath](v))
if addr != c.attrs.Ipv6 {
c.attrs.Ipv6 = addr
c.attrs.IPv6Mask = mask
sensorCh <- c
}
}
}
sensorCh <- c
log.Debug().Msg("address changed")
}()
}
}).
Expand Down Expand Up @@ -276,7 +288,26 @@ func getActiveConnections(ctx context.Context) []dbus.ObjectPath {
return dbushelpers.VariantToValue[[]dbus.ObjectPath](v)
}

func monitorActiveConnections(ctx context.Context, sensorCh chan tracker.Sensor, conns []dbus.ObjectPath) {
func monitorActiveConnections(ctx context.Context) chan tracker.Sensor {
sensorCh := make(chan tracker.Sensor, 1)
conns := getActiveConnections(ctx)

handleConn := func(path dbus.ObjectPath) {
conn := newConnection(ctx, path)
sensorCh <- conn
for c := range conn.monitor(ctx) {
sensorCh <- c
}
if i := slices.Index(conns, path); i > 0 {
slices.Delete(conns, i, i)
}
}

spew.Dump(conns)
for _, p := range conns {
go handleConn(p)
}

err := dbushelpers.NewBusRequest(ctx, dbushelpers.SystemBus).
Match([]dbus.MatchOption{
dbus.WithMatchPathNamespace(dbusNMActiveConnPath),
Expand All @@ -287,43 +318,43 @@ func monitorActiveConnections(ctx context.Context, sensorCh chan tracker.Sensor,
return
}
if !slices.Contains(conns, s.Path) {
conn := newConnection(ctx, s.Path)
go func() {
for m := range conn.monitor(ctx) {
sensorCh <- m
}
}()
conns = append(conns, s.Path)
sensorCh <- conn
go handleConn(s.Path)
}
}).
AddWatch(ctx)
if err != nil {
log.Error().Err(err).
Msg("Failed to create connection state change D-Bus watch.")
close(sensorCh)
}
}

func NetworkConnectionsUpdater(ctx context.Context) chan tracker.Sensor {
sensorCh := make(chan tracker.Sensor, 1)

conns := getActiveConnections(ctx)
go func() {
for _, p := range conns {
conn := newConnection(ctx, p)
go func() {
for m := range conn.monitor(ctx) {
sensorCh <- m
}
}()
sensorCh <- conn
}
}()
go monitorActiveConnections(ctx, sensorCh, conns)
go func() {
defer close(sensorCh)
<-ctx.Done()
log.Debug().Msg("Stopped network connection state sensors.")
}()
return sensorCh
}

func NetworkConnectionsUpdater(ctx context.Context) chan tracker.Sensor {
// sensorCh := make(chan tracker.Sensor, 1)

// conns := getActiveConnections(ctx)
// go func() {
// for _, p := range conns {
// conn := newConnection(ctx, p)
// go func() {
// for m := range conn.monitor(ctx) {
// sensorCh <- m
// }
// }()
// sensorCh <- conn
// }
// }()
return monitorActiveConnections(ctx)
// go func() {
// defer close(sensorCh)
// <-ctx.Done()
// log.Debug().Msg("Stopped network connection state sensors.")
// }()
// return sensorCh
}

0 comments on commit 44f5375

Please sign in to comment.