Skip to content

Commit

Permalink
Improve notification storage
Browse files Browse the repository at this point in the history
  • Loading branch information
sesposito committed Aug 9, 2023
1 parent b269fb3 commit b206f04
Showing 1 changed file with 36 additions and 30 deletions.
66 changes: 36 additions & 30 deletions server/core_notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"encoding/base64"
"encoding/gob"
"fmt"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/stdlib"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -72,15 +74,17 @@ func NotificationSend(ctx context.Context, logger *zap.Logger, db *sql.DB, messa
}

// Deliver live notifications to connected users.
for userID, ns := range notifications {
messageRouter.SendToStream(logger, PresenceStream{Mode: StreamModeNotifications, Subject: userID}, &rtapi.Envelope{
Message: &rtapi.Envelope_Notifications{
Notifications: &rtapi.Notifications{
Notifications: ns,
go func() {
for userID, ns := range notifications {
messageRouter.SendToStream(logger, PresenceStream{Mode: StreamModeNotifications, Subject: userID}, &rtapi.Envelope{
Message: &rtapi.Envelope_Notifications{
Notifications: &rtapi.Notifications{
Notifications: ns,
},
},
},
}, true)
}
}, true)
}
}()

return nil
}
Expand Down Expand Up @@ -283,36 +287,38 @@ func NotificationDelete(ctx context.Context, logger *zap.Logger, db *sql.DB, use
}

func NotificationSave(ctx context.Context, logger *zap.Logger, db *sql.DB, notifications map[uuid.UUID][]*api.Notification) error {
statements := make([]string, 0, len(notifications))
params := make([]interface{}, 0, len(notifications))
counter := 0
rows := make([][]interface{}, 0, len(notifications))
for userID, no := range notifications {
for _, un := range no {
statement := "$" + strconv.Itoa(counter+1) +
",$" + strconv.Itoa(counter+2) +
",$" + strconv.Itoa(counter+3) +
",$" + strconv.Itoa(counter+4) +
",$" + strconv.Itoa(counter+5) +
",$" + strconv.Itoa(counter+6)

counter = counter + 6
statements = append(statements, "("+statement+")")

params = append(params, un.Id)
params = append(params, userID)
params = append(params, un.Subject)
params = append(params, un.Content)
params = append(params, un.Code)
params = append(params, un.SenderId)
rows = append(rows, []interface{}{un.Id, userID, un.Subject, un.Content, un.Code, un.SenderId})
}
}

query := "INSERT INTO notification (id, user_id, subject, content, code, sender_id) VALUES " + strings.Join(statements, ", ")
conn, err := db.Conn(ctx)
if err != nil {
logger.Error("Failed to get db conn", zap.Error(err))
return err
}
if err = conn.Raw(func(driverConn any) error {
pgxConn := driverConn.(*stdlib.Conn).Conn()

_, err := pgxConn.CopyFrom(
ctx,
pgx.Identifier{"notification"},
[]string{"id", "user_id", "subject", "content", "code", "sender_id"},
pgx.CopyFromRows(rows),
)
if err != nil {
return err
}

if _, err := db.ExecContext(ctx, query, params...); err != nil {
logger.Error("Could not save notifications.", zap.Error(err))
return nil
}); err != nil {
conn.Close()
logger.Error("Failed to get pgx db conn", zap.Error(err))
return err
}
conn.Close()

return nil
}

0 comments on commit b206f04

Please sign in to comment.