diff --git a/datastore/domotik/application.go b/datastore/domotik/application.go new file mode 100644 index 0000000..241be7b --- /dev/null +++ b/datastore/domotik/application.go @@ -0,0 +1,63 @@ +package main + +import ( + "time" + + "github.com/sylvek/domotik/datastore/domain" + "github.com/sylvek/domotik/datastore/port" +) + +type Application struct { + input domain.Input + state domain.State + stateRepository port.StateRepository + logRepository port.LogRepository + notificationRepository port.NotificationRepository +} + +func (a *Application) AddLog(l domain.Log) error { + if err := a.logRepository.Store(l); err != nil { + return err + } + + a.input.UpdateFromLog(l) + + return nil +} + +func (a *Application) Process() error { + if a.input.HasIndice() { + newState, output := domain.GenerateStatistics(time.Now(), a.state, a.input) + + if err := a.stateRepository.Store(newState); err != nil { + return err + } + if err := a.notificationRepository.Notify(output); err != nil { + return err + } + + a.input.ResetIndice() + a.state = newState + } + + return nil +} + +func NewApplication( + stateRepository port.StateRepository, + logRepository port.LogRepository, + notificationRepository port.NotificationRepository) (*Application, error) { + + state, err := stateRepository.Retrieve() + if err != nil { + return nil, err + } + + return &Application{ + input: *domain.NewInput(), + state: state, + stateRepository: stateRepository, + logRepository: logRepository, + notificationRepository: notificationRepository, + }, nil +} diff --git a/datastore/domotik/domain/application.go b/datastore/domotik/domain/application.go deleted file mode 100644 index 7c9bdb1..0000000 --- a/datastore/domotik/domain/application.go +++ /dev/null @@ -1,129 +0,0 @@ -package domain - -import ( - "time" - - "github.com/sylvek/domotik/datastore/domain/model" - "github.com/sylvek/domotik/datastore/port" -) - -type Application struct { - input model.Input - state model.State - stateRepository port.StateRepository - logRepository port.LogRepository - notificationRepository port.NotificationRepository -} - -func (a *Application) AddLog(l model.Log) error { - if err := a.logRepository.Store(l); err != nil { - return err - } - - if l.Name == "linky" && l.Unit == "indice" { - a.input.Indice = int64(l.Value) - } - - if l.Name == "linky" && l.Unit == "state" { - a.input.LowTariff = l.Value == 0.0 - } - - return nil -} - -func (a *Application) Process() error { - if a.input.Indice > 0 { - newState, output := execute(time.Now(), a.state, a.input) - a.input.Indice = 0 - a.state = newState - - if err := a.stateRepository.Store(newState); err != nil { - return err - } - if err := a.notificationRepository.Notify(output); err != nil { - return err - } - } - - return nil -} - -func NewApplication( - stateRepository port.StateRepository, - logRepository port.LogRepository, - notificationRepository port.NotificationRepository) (*Application, error) { - - state, err := stateRepository.Retrieve() - if err != nil { - return nil, err - } - - return &Application{ - input: model.Input{LowTariff: false, Indice: 0}, - state: state, - stateRepository: stateRepository, - logRepository: logRepository, - notificationRepository: notificationRepository, - }, nil -} - -func execute(t time.Time, state model.State, input model.Input) (model.State, model.Output) { - now := t.Unix() - - lastIndice := state.LastIndice - lastEpoch := state.LastIndiceTS - newIndice := input.Indice - - // calculate watt consumed between 2 Ticks - wattConsumedDuringBuffering := int64(0) - if lastIndice > 0 { - wattConsumedDuringBuffering = newIndice - lastIndice - } - state.LastIndice = newIndice - - minutesSinceTheLastIndice := float64(1) - if wattConsumedDuringBuffering > 0 { - if lastEpoch > 0 { - minutesSinceTheLastIndice = float64(now-lastEpoch) / 60 - } - state.LastIndiceTS = now - } - - // if new day -> clear daily state - if t.Day() != state.CurrentDay { - state.CurrentDay = t.Day() - state.DailySumHigh = 0 - state.DailySumLow = 0 - } - - // if new hour -> clear hourly state - if t.Hour() != state.CurrentHour { - state.CurrentHour = t.Hour() - state.HourlyNbIndices = 0 - state.HourlySum = 0 - } - - // calculate mean per hour - state.HourlySum += wattConsumedDuringBuffering - state.HourlyNbIndices += 1 - - // calculate ratio low/high and € - if input.LowTariff { - state.DailySumLow += wattConsumedDuringBuffering - } else { - state.DailySumHigh += wattConsumedDuringBuffering - } - - ratioLowTariffToday := 1.0 - if state.DailySumHigh > 0 { - ratioLowTariffToday = float64(state.DailySumLow) / float64(state.DailySumHigh+state.DailySumLow) - } - - return state, model.Output{ - WattPerHourForLastMinute: float64(wattConsumedDuringBuffering) * 60 / minutesSinceTheLastIndice, - WattPerHourForThisHour: state.HourlySum * 60 / state.HourlyNbIndices, - WattConsumedToday: state.DailySumHigh + state.DailySumLow, - EuroSpentToday: float64(state.DailySumHigh)*0.0001963 + float64(state.DailySumLow)*0.0001457, - RatioLowTariffToday: ratioLowTariffToday, - } -} diff --git a/datastore/domotik/domain/input.entity.go b/datastore/domotik/domain/input.entity.go new file mode 100644 index 0000000..429dc0a --- /dev/null +++ b/datastore/domotik/domain/input.entity.go @@ -0,0 +1,27 @@ +package domain + +type Input struct { + lowTariff bool + indice int64 +} + +func NewInput() *Input { + return &Input{lowTariff: false, indice: 0} +} + +func (i *Input) UpdateFromLog(l Log) { + if l.name == "linky" && l.unit == "indice" { + i.indice = int64(l.value) + } + if l.name == "linky" && l.unit == "state" { + i.lowTariff = l.value == 0.0 + } +} + +func (i *Input) ResetIndice() { + i.indice = 0 +} + +func (i *Input) HasIndice() bool { + return i.indice > 0 +} diff --git a/datastore/domotik/domain/log.entity.go b/datastore/domotik/domain/log.entity.go new file mode 100644 index 0000000..1541a07 --- /dev/null +++ b/datastore/domotik/domain/log.entity.go @@ -0,0 +1,33 @@ +package domain + +type Log struct { + topic string + name string + unit string + value float64 +} + +func NewLog(elements []string, value float64) *Log { + return &Log{ + topic: elements[0], + name: elements[1], + unit: elements[2], + value: value, + } +} + +func (l *Log) GetTopic() string { + return l.topic +} + +func (l *Log) GetName() string { + return l.name +} + +func (l *Log) GetUnit() string { + return l.unit +} + +func (l *Log) GetValue() float64 { + return l.value +} diff --git a/datastore/domotik/domain/model/input.go b/datastore/domotik/domain/model/input.go deleted file mode 100644 index c17f89a..0000000 --- a/datastore/domotik/domain/model/input.go +++ /dev/null @@ -1,6 +0,0 @@ -package model - -type Input struct { - LowTariff bool - Indice int64 -} diff --git a/datastore/domotik/domain/model/log.go b/datastore/domotik/domain/model/log.go deleted file mode 100644 index bf4a435..0000000 --- a/datastore/domotik/domain/model/log.go +++ /dev/null @@ -1,8 +0,0 @@ -package model - -type Log struct { - Topic string - Name string - Unit string - Value float64 -} diff --git a/datastore/domotik/domain/model/output.go b/datastore/domotik/domain/model/output.go deleted file mode 100644 index 81194a7..0000000 --- a/datastore/domotik/domain/model/output.go +++ /dev/null @@ -1,9 +0,0 @@ -package model - -type Output struct { - WattPerHourForLastMinute float64 - WattPerHourForThisHour int64 - WattConsumedToday int64 - EuroSpentToday float64 - RatioLowTariffToday float64 -} diff --git a/datastore/domotik/domain/model/state.go b/datastore/domotik/domain/model/state.go deleted file mode 100644 index ba73783..0000000 --- a/datastore/domotik/domain/model/state.go +++ /dev/null @@ -1,12 +0,0 @@ -package model - -type State struct { - LastIndice int64 - LastIndiceTS int64 - CurrentDay int - CurrentHour int - DailySumHigh int64 - DailySumLow int64 - HourlySum int64 - HourlyNbIndices int64 -} diff --git a/datastore/domotik/domain/output.entity.go b/datastore/domotik/domain/output.entity.go new file mode 100644 index 0000000..3124211 --- /dev/null +++ b/datastore/domotik/domain/output.entity.go @@ -0,0 +1,39 @@ +package domain + +type Output struct { + wattPerHourForLastMinute float64 + wattPerHourForThisHour int64 + wattConsumedToday int64 + euroSpentToday float64 + ratioLowTariffToday float64 +} + +func NewOutput(state State, consumptionSinceLastTime int64, minutesSinceTheLastIndice float64, ratioLowTariffToday float64) *Output { + return &Output{ + wattPerHourForLastMinute: float64(consumptionSinceLastTime) * 60 / minutesSinceTheLastIndice, + wattPerHourForThisHour: state.hourlySum * 60 / state.hourlyNbIndices, + wattConsumedToday: state.dailySumHigh + state.dailySumLow, + euroSpentToday: float64(state.dailySumHigh)*0.0001963 + float64(state.dailySumLow)*0.0001457, + ratioLowTariffToday: ratioLowTariffToday, + } +} + +func (o *Output) GetEuroSpentToday() float64 { + return o.euroSpentToday +} + +func (o *Output) GetRatioLowTariffToday() float64 { + return o.ratioLowTariffToday +} + +func (o *Output) GetWattConsumedToday() float64 { + return float64(o.wattConsumedToday) +} + +func (o *Output) GetWattPerHourForThisHour() float64 { + return float64(o.wattPerHourForThisHour) +} + +func (o *Output) GetWattPerHourForLastMinute() float64 { + return float64(o.wattPerHourForLastMinute) +} diff --git a/datastore/domotik/domain/state.entity.go b/datastore/domotik/domain/state.entity.go new file mode 100644 index 0000000..36b3634 --- /dev/null +++ b/datastore/domotik/domain/state.entity.go @@ -0,0 +1,81 @@ +package domain + +import "time" + +type State struct { + lastIndice int64 + lastIndiceTS int64 + currentDay int + currentHour int + dailySumHigh int64 + dailySumLow int64 + hourlySum int64 + hourlyNbIndices int64 +} + +func NewState() *State { + return &State{ + lastIndice: 0, + currentDay: 0, + currentHour: 0, + dailySumHigh: 0, + dailySumLow: 0, + hourlySum: 0, + hourlyNbIndices: 0, + } +} + +func (s *State) GetConsumptionSinceLastTime(now int64, newIndice int64) (int64, float64) { + consumptionSinceLastTime := int64(0) + if s.lastIndice > 0 { + consumptionSinceLastTime = newIndice - s.lastIndice + } + s.lastIndice = newIndice + + minutesSinceTheLastIndice := float64(1) + if consumptionSinceLastTime > 0 { + if s.lastIndiceTS > 0 { + minutesSinceTheLastIndice = float64(now-s.lastIndiceTS) / 60 + } + s.lastIndiceTS = now + } + + return consumptionSinceLastTime, minutesSinceTheLastIndice +} + +func (s *State) ProcessNewDay(t time.Time) { + if t.Day() != s.currentDay { + s.currentDay = t.Day() + s.dailySumHigh = 0 + s.dailySumLow = 0 + } +} + +func (s *State) ProcessNewHour(t time.Time) { + if t.Hour() != s.currentHour { + s.currentHour = t.Hour() + s.hourlyNbIndices = 0 + s.hourlySum = 0 + } +} + +func (s *State) IncHourlyConsumption(wattConsumedDuringBuffering int64) { + s.hourlySum += wattConsumedDuringBuffering + s.hourlyNbIndices += 1 +} + +func (s *State) IncDailyConsumption(wattConsumedDuringBuffering int64, lowTariff bool) { + if lowTariff { + s.dailySumLow += wattConsumedDuringBuffering + } else { + s.dailySumHigh += wattConsumedDuringBuffering + } +} + +func (s *State) GetLowTariffRate() float64 { + ratioLowTariffToday := 1.0 + if s.dailySumHigh > 0 { + ratioLowTariffToday = float64(s.dailySumLow) / float64(s.dailySumHigh+s.dailySumLow) + } + return ratioLowTariffToday +} diff --git a/datastore/domotik/domain/statistics.service.go b/datastore/domotik/domain/statistics.service.go new file mode 100644 index 0000000..b76ebe5 --- /dev/null +++ b/datastore/domotik/domain/statistics.service.go @@ -0,0 +1,20 @@ +package domain + +import ( + "time" +) + +func GenerateStatistics(t time.Time, state State, input Input) (State, Output) { + + consumptionSinceLastTime, minutesSinceTheLastIndice := state.GetConsumptionSinceLastTime(t.Unix(), input.indice) + + state.ProcessNewDay(t) + state.ProcessNewHour(t) + + state.IncHourlyConsumption(consumptionSinceLastTime) + state.IncDailyConsumption(consumptionSinceLastTime, input.lowTariff) + + ratioLowTariffToday := state.GetLowTariffRate() + + return state, *NewOutput(state, consumptionSinceLastTime, minutesSinceTheLastIndice, ratioLowTariffToday) +} diff --git a/datastore/domotik/infrastructure/local.repository.go b/datastore/domotik/infrastructure/local.repository.go index 2710c2a..9975a30 100644 --- a/datastore/domotik/infrastructure/local.repository.go +++ b/datastore/domotik/infrastructure/local.repository.go @@ -4,7 +4,7 @@ import ( "encoding/json" "os" - "github.com/sylvek/domotik/datastore/domain/model" + "github.com/sylvek/domotik/datastore/domain" "github.com/sylvek/domotik/datastore/port" ) @@ -15,23 +15,15 @@ type LocalClient struct { const FILE = "/state.json" // Retrieve implements port.StateRepository. -func (l *LocalClient) Retrieve() (model.State, error) { - state := model.State{ - LastIndice: 0, - CurrentDay: 0, - CurrentHour: 0, - DailySumHigh: 0, - DailySumLow: 0, - HourlySum: 0, - HourlyNbIndices: 0, - } +func (l *LocalClient) Retrieve() (domain.State, error) { + state := domain.NewState() data, err := os.ReadFile(l.path + FILE) if err == nil { json.Unmarshal(data, &state) } // if the file does not exist we return the empty state - return state, nil + return *state, nil } // Close implements port.StateRepository. @@ -39,7 +31,7 @@ func (l *LocalClient) Close() { } // Store implements port.StateRepository. -func (l *LocalClient) Store(state model.State) error { +func (l *LocalClient) Store(state domain.State) error { s, _ := json.Marshal(state) return os.WriteFile(l.path+FILE, s, 0644) } diff --git a/datastore/domotik/infrastructure/mqtt.repository.go b/datastore/domotik/infrastructure/mqtt.repository.go index e864561..6a56146 100644 --- a/datastore/domotik/infrastructure/mqtt.repository.go +++ b/datastore/domotik/infrastructure/mqtt.repository.go @@ -5,7 +5,7 @@ import ( "log" mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/sylvek/domotik/datastore/domain/model" + "github.com/sylvek/domotik/datastore/domain" "github.com/sylvek/domotik/datastore/port" ) @@ -20,12 +20,12 @@ func (m *MqttClient) Close() { } // Notify implements port.NotificationRepository. -func (m *MqttClient) Notify(output model.Output) error { - m.publish("sensors", "sumPerDay", output.EuroSpentToday, "euro") - m.publish("sensors", "sumPerDay", output.RatioLowTariffToday, "rate") - m.publish("sensors", "sumPerDay", float64(output.WattConsumedToday), "watt") - m.publish("sensors", "meanPerHour", float64(output.WattPerHourForThisHour), "watt") - m.publish("sensors", "meanPerMinute", float64(output.WattPerHourForLastMinute), "watt") +func (m *MqttClient) Notify(output domain.Output) error { + m.publish("sensors", "sumPerDay", output.GetEuroSpentToday(), "euro") + m.publish("sensors", "sumPerDay", output.GetRatioLowTariffToday(), "rate") + m.publish("sensors", "sumPerDay", output.GetWattConsumedToday(), "watt") + m.publish("sensors", "meanPerHour", output.GetWattPerHourForThisHour(), "watt") + m.publish("sensors", "meanPerMinute", output.GetWattPerHourForLastMinute(), "watt") return nil } diff --git a/datastore/domotik/infrastructure/sqlite.repository.go b/datastore/domotik/infrastructure/sqlite.repository.go index ed73516..eeb5405 100644 --- a/datastore/domotik/infrastructure/sqlite.repository.go +++ b/datastore/domotik/infrastructure/sqlite.repository.go @@ -9,7 +9,7 @@ import ( "strconv" "time" - "github.com/sylvek/domotik/datastore/domain/model" + "github.com/sylvek/domotik/datastore/domain" "github.com/sylvek/domotik/datastore/port" _ "github.com/glebarez/go-sqlite" @@ -64,7 +64,7 @@ func (d *SqliteClient) Close() { } // Store implements port.LogRepository. -func (d *SqliteClient) Store(l model.Log) error { +func (d *SqliteClient) Store(l domain.Log) error { t := time.Now() currentDayOfYear := t.YearDay() @@ -80,13 +80,13 @@ func (d *SqliteClient) Store(l model.Log) error { log.Println("daily work is finished") } - db := d.instances[l.Topic].db + db := d.instances[l.GetTopic()].db _, err := db.Exec("INSERT INTO data (ts, name, unit, value) VALUES (?, ?, ?, ?)", // Grafana requires that data is stored in UTC t.Unix(), - l.Name, - l.Unit, - l.Value) + l.GetName(), + l.GetUnit(), + l.GetValue()) if err != nil { return err } diff --git a/datastore/domotik/main.go b/datastore/domotik/main.go index d3b29d0..7a7aa78 100644 --- a/datastore/domotik/main.go +++ b/datastore/domotik/main.go @@ -10,7 +10,6 @@ import ( "time" "github.com/sylvek/domotik/datastore/domain" - "github.com/sylvek/domotik/datastore/domain/model" "github.com/sylvek/domotik/datastore/infrastructure" ) @@ -34,7 +33,7 @@ func main() { log.Println("ciao.") }() - application, err := domain.NewApplication(localRepository, sqliteRepository, mqttRepository) + application, err := NewApplication(localRepository, sqliteRepository, mqttRepository) if err != nil { log.Fatal(err) } @@ -53,7 +52,7 @@ func main() { elements := strings.Split(msg.Topic(), "/") value, _ := strconv.ParseFloat(payload, 64) - err := application.AddLog(model.Log{Topic: elements[0], Name: elements[1], Unit: elements[2], Value: value}) + err := application.AddLog(*domain.NewLog(elements, value)) if err != nil { log.Printf(" - error - %s", err) time.Sleep(time.Second) diff --git a/datastore/domotik/port/log.repository.go b/datastore/domotik/port/log.repository.go index 0ddb35c..79281e2 100644 --- a/datastore/domotik/port/log.repository.go +++ b/datastore/domotik/port/log.repository.go @@ -1,8 +1,8 @@ package port -import "github.com/sylvek/domotik/datastore/domain/model" +import "github.com/sylvek/domotik/datastore/domain" type LogRepository interface { Repository - Store(model.Log) error + Store(domain.Log) error } diff --git a/datastore/domotik/port/notification.repository.go b/datastore/domotik/port/notification.repository.go index 6e6bfe2..6ccef3a 100644 --- a/datastore/domotik/port/notification.repository.go +++ b/datastore/domotik/port/notification.repository.go @@ -1,8 +1,8 @@ package port -import "github.com/sylvek/domotik/datastore/domain/model" +import "github.com/sylvek/domotik/datastore/domain" type NotificationRepository interface { Repository - Notify(model.Output) error + Notify(domain.Output) error } diff --git a/datastore/domotik/port/state.repository.go b/datastore/domotik/port/state.repository.go index 812bcfe..128d293 100644 --- a/datastore/domotik/port/state.repository.go +++ b/datastore/domotik/port/state.repository.go @@ -1,9 +1,9 @@ package port -import "github.com/sylvek/domotik/datastore/domain/model" +import "github.com/sylvek/domotik/datastore/domain" type StateRepository interface { Repository - Store(model.State) error - Retrieve() (model.State, error) + Store(domain.State) error + Retrieve() (domain.State, error) }