Skip to content

Commit

Permalink
InfluxDB: publish structs and pointers (#18173)
Browse files Browse the repository at this point in the history
  • Loading branch information
andig authored Jan 11, 2025
1 parent 36f0005 commit 937b436
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 103 deletions.
68 changes: 43 additions & 25 deletions server/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,66 +67,84 @@ func (m *Influx) writePoint(writer pointWriter, key string, fields map[string]an
}

// writeComplexPoint asynchronously writes a point to influx
func (m *Influx) writeComplexPoint(writer pointWriter, param util.Param, tags map[string]string) {
func (m *Influx) writeComplexPoint(writer pointWriter, key string, val any, tags map[string]string) {
fields := make(map[string]any)

switch val := param.Val.(type) {
// loop struct
writeStruct := func(sv any) {
typ := reflect.TypeOf(sv)
val := reflect.ValueOf(sv)

for i := 0; i < typ.NumField(); i++ {
if f := typ.Field(i); f.IsExported() {
if val.Field(i).IsZero() && omitEmpty(f) {
continue
}

key := key + strings.ToUpper(f.Name[:1]) + f.Name[1:]
val := val.Field(i).Interface()

m.writeComplexPoint(writer, key, val, tags)
}
}
}

switch valueType := val.(type) {
case string:
return

case int, int64, float64:
fields["value"] = param.Val
fields["value"] = val

case []float64:
if len(val) != 3 {
if len(valueType) != 3 {
return
}

// add array as phase values
for i, v := range val {
for i, v := range valueType {
fields[fmt.Sprintf("l%d", i+1)] = v
}

case [3]float64:
// add array as phase values
for i, v := range val {
for i, v := range valueType {
fields[fmt.Sprintf("l%d", i+1)] = v
}

default:
// allow writing nil values
if param.Val == nil {
if val == nil {
fields["value"] = nil
break
}

switch typ := reflect.TypeOf(val); {
// pointer
case typ.Kind() == reflect.Ptr:
if val := reflect.ValueOf(val); !val.IsNil() {
m.writeComplexPoint(writer, key, reflect.Indirect(val).Interface(), tags)
}

// struct
case typ.Kind() == reflect.Struct:
writeStruct(val)

// slice of structs
if typ := reflect.TypeOf(param.Val); typ.Kind() == reflect.Slice && typ.Elem().Kind() == reflect.Struct {
val := reflect.ValueOf(param.Val)
case typ.Kind() == reflect.Slice && typ.Elem().Kind() == reflect.Struct:
val := reflect.ValueOf(val)

// loop slice
for i := 0; i < val.Len(); i++ {
val := val.Index(i)
typ := val.Type()

// loop struct
for j := 0; j < typ.NumField(); j++ {
n := typ.Field(j).Name
v := val.Field(j).Interface()

key := param.Key + strings.ToUpper(n[:1]) + n[1:]
fields["value"] = v
tags["id"] = strconv.Itoa(i + 1)

m.writePoint(writer, key, fields, tags)
}
tags["id"] = strconv.Itoa(i + 1)
writeStruct(val.Index(i).Interface())
}
}

return
}

m.writePoint(writer, param.Key, fields, tags)
m.writePoint(writer, key, fields, tags)
}

// Run Influx publisher
Expand All @@ -153,7 +171,7 @@ func (m *Influx) Run(site site.API, in <-chan util.Param) {
}
}

m.writeComplexPoint(writer, param, tags)
m.writeComplexPoint(writer, param.Key, param.Val, tags)
}

m.client.Close()
Expand Down
149 changes: 84 additions & 65 deletions server/influxdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,84 +7,103 @@ import (
"github.com/evcc-io/evcc/util"
inf2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/stretchr/testify/assert"
"github.com/samber/lo"
"github.com/stretchr/testify/suite"
)

type influxWriter struct {
t *testing.T
p []*write.Point
idx int
func TestInfluxTypes(t *testing.T) {
suite.Run(t, new(influxSuite))
}

type influxSuite struct {
suite.Suite
*Influx
p []*write.Point
}

func (w *influxWriter) WritePoint(p *write.Point) {
if w.idx >= len(w.p) {
w.t.Fatal("too many points")
func (suite *influxSuite) SetupSuite() {
suite.Influx = &Influx{
log: util.NewLogger("foo"),
clock: clock.NewMock(),
}
}

assert.Equal(w.t, w.p[w.idx], p)
w.idx++
func (suite *influxSuite) SetupTest() {
suite.p = nil
}

func (w *influxWriter) finish() {
assert.Len(w.t, w.p, w.idx, "not enough points")
func (suite *influxSuite) WritePoint(p *write.Point) {
suite.p = append(suite.p, p)
}

func TestInfluxTypes(t *testing.T) {
m := &Influx{
log: util.NewLogger("foo"),
clock: clock.NewMock(),
}
func (suite *influxSuite) WriteParam(p util.Param) {
tags := make(map[string]string)
suite.Influx.writeComplexPoint(suite, p.Key, p.Val, tags)
}

{
// string value
w := &influxWriter{
t: t, p: []*write.Point{inf2.NewPoint("foo", nil, map[string]any{"value": 1}, m.clock.Now())},
}
m.writeComplexPoint(w, util.Param{Key: "foo", Val: 1}, nil)
w.finish()
}
func (w *influxSuite) TestString() {
w.WriteParam(util.Param{Key: "foo", Val: 1})
w.Equal([]*write.Point{inf2.NewPoint("foo", nil, map[string]any{"value": 1}, w.clock.Now())}, w.p)
}

{
// nil value - https://github.com/evcc-io/evcc/issues/5950
w := &influxWriter{
t: t, p: []*write.Point{inf2.NewPoint("phasesConfigured", nil, map[string]any{"value": nil}, m.clock.Now())},
}
m.writeComplexPoint(w, util.Param{Key: "phasesConfigured", Val: nil}, nil)
w.finish()
}
// bool is not published
// func (w *influxSuite) TestBool() {
// w.WriteParam(util.Param{Key: "foo", Val: false})
// w.Equal([]*write.Point{inf2.NewPoint("foo", nil, map[string]any{"value": "false"}, w.clock.Now())}, w.p)
// }

{
// phases array
w := &influxWriter{
t: t, p: []*write.Point{inf2.NewPoint("foo", nil, map[string]any{
"l1": 1.0,
"l2": 2.0,
"l3": 3.0,
}, m.clock.Now())},
}
m.writeComplexPoint(w, util.Param{Key: "foo", Val: [3]float64{1, 2, 3}}, nil)
w.finish()
}
func (w *influxSuite) TestNil() {
// nil value - https://github.com/evcc-io/evcc/issues/5950
w.WriteParam(util.Param{Key: "foo", Val: nil})
w.Equal([]*write.Point{inf2.NewPoint("foo", nil, map[string]any{"value": nil}, w.clock.Now())}, w.p)
}

{
// phases slice
w := &influxWriter{
t: t, p: []*write.Point{inf2.NewPoint("foo", nil, map[string]any{
"l1": 1.0,
"l2": 2.0,
"l3": 3.0,
}, m.clock.Now())},
}
m.writeComplexPoint(w, util.Param{Key: "foo", Val: []float64{1, 2, 3}}, nil)
w.finish()
}
func (w *influxSuite) TestPointer() {
w.WriteParam(util.Param{Key: "foo", Val: lo.ToPtr(1)})
w.Equal([]*write.Point{inf2.NewPoint("foo", nil, map[string]any{"value": 1}, w.clock.Now())}, w.p)
}

{
// arbitrary slice
w := &influxWriter{
t: t, p: nil,
}
m.writeComplexPoint(w, util.Param{Key: "foo", Val: []float64{1, 2, 3, 4}}, nil)
w.finish()
}
func (w *influxSuite) TestArray() {
// nil value - https://github.com/evcc-io/evcc/issues/5950
w.WriteParam(util.Param{Key: "foo", Val: [3]float64{1, 2, 3}})
w.Equal([]*write.Point{inf2.NewPoint("foo", nil, map[string]any{
"l1": 1.0,
"l2": 2.0,
"l3": 3.0,
}, w.clock.Now())}, w.p)
}

func (w *influxSuite) TestPhasesSlice() {
w.WriteParam(util.Param{Key: "foo", Val: []float64{1, 2, 3}})
w.Equal([]*write.Point{inf2.NewPoint("foo", nil, map[string]any{
"l1": 1.0,
"l2": 2.0,
"l3": 3.0,
}, w.clock.Now())}, w.p)
}

func (w *influxSuite) TestSlice() {
w.WriteParam(util.Param{Key: "foo", Val: []float64{1, 2, 3, 4}})
w.Len(w.p, 0)
}

func (w *influxSuite) TestMeasurement() {
w.WriteParam(util.Param{Key: "battery", Val: measurement{Power: 1, Soc: lo.ToPtr(10.0)}})
w.Equal([]*write.Point{
inf2.NewPoint("batteryPower", nil, map[string]any{"value": 1.0}, w.clock.Now()),
inf2.NewPoint("batterySoc", nil, map[string]any{"value": 10.0}, w.clock.Now()),
}, w.p)
}

func (w *influxSuite) TestSliceOfStruct() {
w.WriteParam(util.Param{Key: "grid", Val: []measurement{
{Power: 1, Soc: lo.ToPtr(10.0)},
{Power: 2, Soc: lo.ToPtr(20.0)},
}})
w.Equal([]*write.Point{
inf2.NewPoint("gridPower", map[string]string{"id": "1"}, map[string]any{"value": 1.0}, w.clock.Now()),
inf2.NewPoint("gridSoc", map[string]string{"id": "1"}, map[string]any{"value": 10.0}, w.clock.Now()),
inf2.NewPoint("gridPower", map[string]string{"id": "2"}, map[string]any{"value": 2.0}, w.clock.Now()),
inf2.NewPoint("gridSoc", map[string]string{"id": "2"}, map[string]any{"value": 20.0}, w.clock.Now()),
}, w.p)
}
4 changes: 2 additions & 2 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func (m *MQTT) publishComplex(topic string, retained bool, payload interface{})
}

case reflect.Pointer:
if !reflect.ValueOf(payload).IsNil() {
m.publishComplex(topic, retained, reflect.Indirect(reflect.ValueOf(payload)).Interface())
if val := reflect.ValueOf(payload); !val.IsNil() {
m.publishComplex(topic, retained, reflect.Indirect(val).Interface())
return
}

Expand Down
19 changes: 8 additions & 11 deletions server/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ type measurement struct {
Power float64 `json:"power"`
Energy float64 `json:"energy,omitempty"`
Currents []float64 `json:"currents,omitempty"`
Soc *float64 `json:"soc,omitempty"`
Controllable *bool `json:"controllable,omitempty"`
}

func TestPublishTypes(t *testing.T) {
func TestMqttTypes(t *testing.T) {
suite.Run(t, new(mqttSuite))
}

Expand Down Expand Up @@ -104,25 +105,21 @@ func (suite *mqttSuite) TestSlice() {
}

func (suite *mqttSuite) TestGrid() {
topics := []string{"test/power", "test/energy", "test/currents", "test/controllable"}
topics := []string{"test/power", "test/energy", "test/currents", "test/soc", "test/controllable"}

suite.publish("test", false, measurement{})
suite.Require().Len(suite.topics, 4)
suite.Equal(topics, suite.topics, "topics")
suite.Equal([]string{"0", "", "", ""}, suite.payloads, "payloads")
suite.Equal([]string{"0", "", "", "", ""}, suite.payloads, "payloads")

suite.publish("test", false, measurement{Energy: 1})
suite.Require().Len(suite.topics, 4)
suite.Equal(topics, suite.topics, "topics")
suite.Equal([]string{"0", "1", "", ""}, suite.payloads, "payloads")
suite.Equal([]string{"0", "1", "", "", ""}, suite.payloads, "payloads")

suite.publish("test", false, measurement{Controllable: lo.ToPtr(false)})
suite.Require().Len(suite.topics, 4)
suite.Equal(topics, suite.topics, "topics")
suite.Equal([]string{"0", "", "", "false"}, suite.payloads, "payloads")
suite.Equal([]string{"0", "", "", "", "false"}, suite.payloads, "payloads")

suite.publish("test", false, measurement{Currents: []float64{1, 2, 3}})
suite.Require().Len(suite.topics, 7)
suite.Equal([]string{"test/power", "test/energy", "test/currents", "test/controllable", "test/currents/1", "test/currents/2", "test/currents/3"}, suite.topics, "topics")
suite.Equal([]string{"0", "", "3", "", "1", "2", "3"}, suite.payloads, "payloads")
suite.Equal(append(topics, "test/currents/1", "test/currents/2", "test/currents/3"), suite.topics, "topics")
suite.Equal([]string{"0", "", "3", "", "", "1", "2", "3"}, suite.payloads, "payloads")
}

0 comments on commit 937b436

Please sign in to comment.