Skip to content

Commit

Permalink
perf(hass): ⚡ rework sending requests
Browse files Browse the repository at this point in the history
- move response processing logic to response objects
- rearrange code locations
  • Loading branch information
joshuar committed Oct 24, 2024
1 parent 077c6a5 commit a8cd590
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 590 deletions.
258 changes: 53 additions & 205 deletions internal/hass/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@
// This software is released under the MIT License.
// https://opensource.org/licenses/MIT

//revive:disable:max-public-structs
//go:generate go run github.com/matryer/moq -out client_mocks_test.go . PostRequest Registry
package hass

import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
Expand All @@ -28,7 +25,10 @@ const (
DefaultTimeout = 30 * time.Second
)

var tracker = sensor.NewTracker()
var (
sensorRegistry Registry
sensorTracker *sensor.Tracker
)

var (
ErrGetConfigFailed = errors.New("could not fetch Home Assistant config")
Expand All @@ -54,26 +54,6 @@ var (
}
)

// GetRequest is a HTTP GET request.
type GetRequest any

// PostRequest is a HTTP POST request with the request body provided by Body().
type PostRequest interface {
RequestBody() json.RawMessage
}

// Authenticated represents a request that requires passing an authentication
// header with the value returned by Auth().
type Authenticated interface {
Auth() string
}

// Encrypted represents a request that should be encrypted with the secret
// provided by Secret().
type Encrypted interface {
Secret() string
}

type Registry interface {
SetDisabled(id string, state bool) error
SetRegistered(id string, state bool) error
Expand All @@ -83,22 +63,19 @@ type Registry interface {

type Client struct {
endpoint *resty.Client
registry Registry
}

func NewClient(ctx context.Context) (*Client, error) {
var err error

reg, err := registry.Load(ctx)
sensorTracker = sensor.NewTracker()

sensorRegistry, err = registry.Load(ctx)
if err != nil {
return nil, fmt.Errorf("could not start registry: %w", err)
}

client := &Client{
registry: reg,
}

return client, nil
return &Client{}, nil
}

func (c *Client) Endpoint(url string, timeout time.Duration) {
Expand Down Expand Up @@ -129,149 +106,74 @@ func (c *Client) ProcessEvent(ctx context.Context, details event.Event) error {
req := &request{Data: details, RequestType: requestTypeEvent}

if err := req.Validate(); err != nil {
return fmt.Errorf("validation failed: %w", err)
return fmt.Errorf("invalid event request: %w", err)
}

_, err := send[eventResponse](ctx, c, req)
resp, err := send[response](ctx, c, req)
if err != nil {
return fmt.Errorf("failed to send event request: %w", err)
}

return nil
}

func (c *Client) ProcessSensor(ctx context.Context, details sensor.Entity) error {
if c.isDisabled(ctx, details) {
logging.FromContext(ctx).
Debug("Not sending request for disabled sensor.",
sensorLogAttrs(details))

return nil
}

if _, ok := details.Value.(*LocationRequest); ok {
// LocationRequest:
return c.handleLocationUpdate(ctx, details)
}

if c.registry.IsRegistered(details.ID) {
// Sensor Update (existing sensor).
return c.handleSensorUpdate(ctx, details)
}
// Sensor Registration (new sensor).
return c.handleRegistration(ctx, details)
}

func (c *Client) handleLocationUpdate(ctx context.Context, details sensor.Entity) error {
// req, err := sensor.NewLocationUpdateRequest(details)
req, err := newEntityRequest(requestTypeLocation, details)
if err != nil {
return fmt.Errorf("unable to handle location update: %w", err)
}

resp, err := send[locationResponse](ctx, c, req)
if err != nil {
return fmt.Errorf("failed to send location update request: %w", err)
}

if err := resp.updated(); err != nil { //nolint:staticcheck
return fmt.Errorf("location update failed: %w", err)
if _, err := resp.Status(); err != nil {
return err
}

return nil
}

func (c *Client) handleSensorUpdate(ctx context.Context, details sensor.Entity) error {
// req, err := sensor.NewUpdateRequest(details)
req, err := newEntityRequest(requestTypeUpdate, details)
if err != nil {
return fmt.Errorf("unable to handle sensor update: %w", err)
}

response, err := send[stateUpdateResponse](ctx, c, req)
if err != nil {
return fmt.Errorf("failed to send sensor update request for %s: %w", details.ID, err)
}
func (c *Client) ProcessSensor(ctx context.Context, details sensor.Entity) error {
req := &request{}

if response == nil {
return ErrStateUpdateUnknown
if _, ok := details.Value.(*LocationRequest); ok {
req = &request{Data: details.Value, RequestType: requestTypeLocation}
}

// At this point, the sensor update was successful. Any errors are really
// warnings and non-critical.
var warnings error

for id, update := range response {
success, err := update.success()
if !success {
// The update failed.
warnings = errors.Join(warnings, err)
}

// If HA reports the sensor as disabled, update the registry.
if c.registry.IsDisabled(id) != update.disabled() {
if sensorRegistry.IsRegistered(details.ID) {
// For sensor updates, if the sensor is disabled, don't continue.
if c.isDisabled(ctx, details) {
logging.FromContext(ctx).
Info("Sensor is disabled in Home Assistant. Setting disabled in local registry.",
Debug("Not sending request for disabled sensor.",
sensorLogAttrs(details))

if err := c.registry.SetDisabled(id, update.disabled()); err != nil {
warnings = errors.Join(warnings, fmt.Errorf("%w: %w", ErrRegDisableFailed, err))
}
}

// Add the sensor update to the tracker.
if err := tracker.Add(&details); err != nil {
warnings = errors.Join(warnings, fmt.Errorf("%w: %w", ErrTrkUpdateFailed, err))
return nil
}
}

if warnings != nil {
logging.FromContext(ctx).
Debug("Sensor updated with warnings.",
sensorLogAttrs(details),
slog.Any("warnings", warnings))
req = &request{Data: details.State, RequestType: requestTypeUpdate}
} else {
logging.FromContext(ctx).
Debug("Sensor updated.",
sensorLogAttrs(details))
req = &request{Data: details, RequestType: requestTypeRegister}
}
// Return success status and any warnings.
return warnings
}

func (c *Client) handleRegistration(ctx context.Context, details sensor.Entity) error {
req, err := newEntityRequest(requestTypeRegister, details)
if err != nil {
return fmt.Errorf("unable to handle sensor update: %w", err)
if err := req.Validate(); err != nil {
return fmt.Errorf("invalid sensor request: %w", err)
}

response, err := send[registrationResponse](ctx, c, req)
if err != nil {
return fmt.Errorf("failed to send sensor registration request for %s: %w", details.ID, err)
}
switch req.RequestType {
case requestTypeLocation:
resp, err := send[response](ctx, c, req)
if err != nil {
return fmt.Errorf("failed to send location update: %w", err)
}

// If the registration failed, log a warning.
success, err := response.registered()
if !success {
return errors.Join(ErrRegistrationFailed, err)
}
if _, err := resp.Status(); err != nil {
return err
}
case requestTypeUpdate:
resp, err := send[bulkSensorUpdateResponse](ctx, c, req)
if err != nil {
return fmt.Errorf("failed to send location update: %w", err)
}

// At this point, the sensor registration was successful. Any errors are really
// warnings and non-critical.
var warnings error
go resp.Process(ctx, details)
case requestTypeRegister:
resp, err := send[registrationResponse](ctx, c, req)
if err != nil {
return fmt.Errorf("failed to send location update: %w", err)
}

// Set the sensor as registered in the registry.
err = c.registry.SetRegistered(details.ID, true)
if err != nil {
warnings = errors.Join(warnings, fmt.Errorf("%w: %w", ErrRegAddFailed, err))
}
// Update the sensor state in the tracker.
if err := tracker.Add(&details); err != nil {
warnings = errors.Join(warnings, fmt.Errorf("%w: %w", ErrTrkUpdateFailed, err))
go resp.Process(ctx, details)
}

// Return success status and any warnings.
return warnings
return nil
}

// isDisabled handles processing a sensor that is disabled. For a sensor that is
Expand All @@ -291,7 +193,7 @@ func (c *Client) isDisabled(ctx context.Context, details sensor.Entity) bool {
if !disabledInHA {
slog.Info("Sensor re-enabled in Home Assistant, Re-enabling in local registry and sending updates.", sensorLogAttrs(details))

if err := c.registry.SetDisabled(details.ID, false); err != nil {
if err := sensorRegistry.SetDisabled(details.ID, false); err != nil {
slog.Error("Could not re-enable sensor.",
sensorLogAttrs(details),
slog.Any("error", err))
Expand All @@ -309,8 +211,10 @@ func (c *Client) isDisabled(ctx context.Context, details sensor.Entity) bool {

// isDisabledInReg returns the disabled state of the sensor from the local
// registry.
//
//revive:disable:unused-receiver
func (c *Client) isDisabledInReg(id string) bool {
return c.registry.IsDisabled(id)
return sensorRegistry.IsDisabled(id)
}

// isDisabledInHA returns the disabled state of the sensor from Home Assistant.
Expand Down Expand Up @@ -338,64 +242,8 @@ func (c *Client) isDisabledInHA(ctx context.Context, details sensor.Entity) bool
return status
}

func send[T any](ctx context.Context, client *Client, requestDetails any) (T, error) {
var (
response T
responseErr apiError
responseObj *resty.Response
)

if client.endpoint == nil {
return response, ErrInvalidClient
}

requestObj := client.endpoint.R().SetContext(ctx)
requestObj = requestObj.SetError(&responseErr)
requestObj = requestObj.SetResult(&response)

// If the request is authenticated, set the auth header with the token.
if a, ok := requestDetails.(Authenticated); ok {
requestObj = requestObj.SetAuthToken(a.Auth())
}

switch req := requestDetails.(type) {
case PostRequest:
logging.FromContext(ctx).
LogAttrs(ctx, logging.LevelTrace,
"Sending request.",
slog.String("method", "POST"),
slog.String("body", string(req.RequestBody())),
slog.Time("sent_at", time.Now()))

responseObj, _ = requestObj.SetBody(req.RequestBody()).Post("") //nolint:errcheck // error is checked with responseObj.IsError()
case GetRequest:
logging.FromContext(ctx).
LogAttrs(ctx, logging.LevelTrace,
"Sending request.",
slog.String("method", "GET"),
slog.Time("sent_at", time.Now()))

responseObj, _ = requestObj.Get("") //nolint:errcheck // error is checked with responseObj.IsError()
}

logging.FromContext(ctx).
LogAttrs(ctx, logging.LevelTrace,
"Received response.",
slog.Int("statuscode", responseObj.StatusCode()),
slog.String("status", responseObj.Status()),
slog.String("protocol", responseObj.Proto()),
slog.Duration("time", responseObj.Time()),
slog.String("body", string(responseObj.Body())))

if responseObj.IsError() {
return response, &apiError{Code: responseObj.StatusCode(), Message: responseObj.Status()}
}

return response, nil
}

func GetSensor(id string) (*sensor.Entity, error) {
details, err := tracker.Get(id)
details, err := sensorTracker.Get(id)
if err != nil {
return nil, fmt.Errorf("could not get sensor details: %w", err)
}
Expand All @@ -404,7 +252,7 @@ func GetSensor(id string) (*sensor.Entity, error) {
}

func SensorList() []string {
return tracker.SensorList()
return sensorTracker.SensorList()
}

// sensorLogAttrs is a convienience function that returns some slog attributes
Expand Down
Loading

0 comments on commit a8cd590

Please sign in to comment.