Skip to content

Commit

Permalink
chore: remove usused pg_notify code (#2574)
Browse files Browse the repository at this point in the history
closes #2156

- pubsub was never using the pg_notify events, but the controller was
listening for them anyway
- deployment updates we switched to a polling approach and seems to be
working fine.
- This PR cleans up the code and sql that is no longer being used.

@alecthomas Feel free to reject this PR if you want to keep this
streaming code because you think it's worth bringing it back in the
future.
  • Loading branch information
matt2e authored Sep 2, 2024
1 parent b984f95 commit c5e8e40
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 149 deletions.
149 changes: 0 additions & 149 deletions backend/controller/dal/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,12 @@ import (
"context"
"crypto/sha256"
"encoding"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/alecthomas/types/optional"
"github.com/jackc/pgx/v5"
"github.com/jpillora/backoff"

"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
)
Expand Down Expand Up @@ -43,16 +39,6 @@ func (n Notification[T, Key, KeyP]) String() string {
// DeploymentNotification is a notification from the database when a deployment changes.
type DeploymentNotification = Notification[Deployment, model.DeploymentKey, *model.DeploymentKey]

// See JSON structure in SQL schema
type event struct {
Table string `json:"table"`
Action string `json:"action"`
New string `json:"new,omitempty"`
Old string `json:"old,omitempty"`
// Optional field for conveying deletion metadata.
Deleted json.RawMessage `json:"deleted,omitempty"`
}

type deploymentState struct {
Key model.DeploymentKey
schemaHash []byte
Expand Down Expand Up @@ -138,138 +124,3 @@ func (d *DAL) PollDeployments(ctx context.Context) {
}
}
}

func (d *DAL) runListener(ctx context.Context, conn *pgx.Conn) {
defer conn.Close(ctx)
logger := log.FromContext(ctx)

logger.Debugf("Starting DB listener")

// Wait for the notification channel to be ready.
retry := backoff.Backoff{}
channels := []string{"deployments_events", "topics_events", "topic_events_events"}
for _, channel := range channels {
for {
_, err := conn.Exec(ctx, "LISTEN "+channel)
if err == nil {
logger.Debugf("Listening to channel: %s", channel)
retry.Reset()
break
}
logger.Errorf(err, "Failed to LISTEN to %s", channel)
time.Sleep(retry.Duration())
}
}

// Main loop for listening to notifications.
for {
delay := time.Millisecond * 100
logger.Tracef("Waiting for notification")
notification, err := waitForNotification(ctx, conn)
if err == nil {
logger.Tracef("Publishing notification: %s", notification)
err = d.publishNotification(ctx, notification, logger)
}
if err != nil {
logger.Errorf(err, "Failed to receive notification")
delay = retry.Duration()
} else {
retry.Reset()
}
select {
case <-ctx.Done():
return

case <-time.After(delay):
}
}
}

func (d *DAL) publishNotification(ctx context.Context, notification event, logger *log.Logger) error {
switch notification.Table {
case "deployments":
deployment, err := decodeNotification(notification, func(key model.DeploymentKey) (Deployment, optional.Option[model.DeploymentKey], error) {
row, err := d.db.GetDeployment(ctx, key)
if err != nil {
return Deployment{}, optional.None[model.DeploymentKey](), libdal.TranslatePGError(err)
}
return Deployment{
CreatedAt: row.Deployment.CreatedAt,
Key: row.Deployment.Key,
Module: row.ModuleName,
Schema: row.Deployment.Schema,
MinReplicas: int(row.Deployment.MinReplicas),
Language: row.Language,
}, optional.None[model.DeploymentKey](), nil
})
if err != nil {
return err
}
logger.Tracef("Deployment notification: %s", deployment)
d.DeploymentChanges.Publish(deployment)

case "topics":
// TODO: handle topics notifications
case "topic_events":
// TODO: handle topic events notifications
default:
panic(fmt.Sprintf("unknown table %q in DB notification", notification.Table))
}
return nil
}

// This function takes a notification from the database and translates it into
// a concrete Notification value.
//
// The translate function is called to translate the key into a concrete value
// OR a delete notification.
func decodeNotification[K any, T NotificationPayload, KP interface {
*K
encoding.TextUnmarshaler
}](notification event, translate func(key K) (T, optional.Option[K], error)) (Notification[T, K, KP], error) {
var (
deleted optional.Option[K]
message optional.Option[T]
)
if notification.Action == "DELETE" {
var deletedKey K
var deletedKeyP KP = &deletedKey
if err := deletedKeyP.UnmarshalText([]byte(notification.Old)); err != nil {
return Notification[T, K, KP]{}, fmt.Errorf("failed to unmarshal notification key: %w", err)
}
deleted = optional.Some(deletedKey)
} else {
var newKey K
var newKeyP KP = &newKey
if err := newKeyP.UnmarshalText([]byte(notification.New)); err != nil {
return Notification[T, K, KP]{}, fmt.Errorf("failed to unmarshal notification key: %w", err)
}
var msg T
var err error
msg, deleted, err = translate(newKey)
if err != nil {
return Notification[T, K, KP]{}, fmt.Errorf("failed to translate database notification: %w", err)
}

if !deleted.Ok() {
message = optional.Some(msg)
}
}

return Notification[T, K, KP]{Deleted: deleted, Message: message}, nil
}

func waitForNotification(ctx context.Context, conn *pgx.Conn) (event, error) {
notification, err := conn.WaitForNotification(ctx)
if err != nil {
return event{}, err
}
ev := event{}
dec := json.NewDecoder(strings.NewReader(notification.Payload))
dec.DisallowUnknownFields()
err = dec.Decode(&ev)
if err != nil {
return event{}, err
}
return ev, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- migrate:up

DROP TRIGGER IF EXISTS deployments_notify_event ON deployments;
DROP TRIGGER IF EXISTS topics_notify_event ON topics;
DROP TRIGGER IF EXISTS topic_events_notify_event ON topic_events;
DROP FUNCTION IF EXISTS notify_event();

-- migrate:down

0 comments on commit c5e8e40

Please sign in to comment.