Skip to content

Commit

Permalink
Replace batch with single unnest batch.
Browse files Browse the repository at this point in the history
  • Loading branch information
ftkg committed Oct 26, 2023
1 parent fd1bde7 commit 18dc6ad
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 72 deletions.
41 changes: 24 additions & 17 deletions server/core_notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/heroiclabs/nakama-common/api"
"github.com/heroiclabs/nakama-common/rtapi"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -274,27 +273,35 @@ func NotificationDelete(ctx context.Context, logger *zap.Logger, db *sql.DB, use
}

func NotificationSave(ctx context.Context, logger *zap.Logger, db *sql.DB, notifications map[uuid.UUID][]*api.Notification) error {
batch := &pgx.Batch{}
query := "INSERT INTO notification (id, user_id, subject, content, code, sender_id) VALUES ($1, $2, $3, $4, $5, $6)"
ids := make([]string, 0, len(notifications))
userIds := make([]uuid.UUID, 0, len(notifications))
subjects := make([]string, 0, len(notifications))
contents := make([]string, 0, len(notifications))
codes := make([]int32, 0, len(notifications))
senderIds := make([]string, 0, len(notifications))
query := `
INSERT INTO
notification (id, user_id, subject, content, code, sender_id)
SELECT
unnest($1::uuid[]),
unnest($2::uuid[]),
unnest($3::text[]),
unnest($4::jsonb[]),
unnest($5::smallint[]),
unnest($6::uuid[]);
`
for userID, no := range notifications {
for _, un := range no {
batch.Queue(query, un.Id, userID, un.Subject, un.Content, un.Code, un.SenderId)
ids = append(ids, un.Id)
userIds = append(userIds, userID)
subjects = append(subjects, un.Subject)
contents = append(contents, un.Content)
codes = append(codes, un.Code)
senderIds = append(senderIds, un.SenderId)
}
}

if err := ExecuteInTxPgx(ctx, db, func(tx pgx.Tx) error {
br := tx.SendBatch(ctx, batch)
defer br.Close()
for _, no := range notifications {
for range no {
_, err := br.Exec()
if err != nil {
return err
}
}
}
return br.Close()
}); err != nil {
if _, err := db.ExecContext(ctx, query, ids, userIds, subjects, contents, codes, senderIds); err != nil {
logger.Error("Could not save notifications.", zap.Error(err))
return err
}
Expand Down
125 changes: 83 additions & 42 deletions server/core_purchase.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/heroiclabs/nakama-common/api"
"github.com/heroiclabs/nakama/v3/iap"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -559,26 +558,32 @@ func upsertPurchases(ctx context.Context, db *sql.DB, purchases []*storagePurcha
}

transactionIDsToPurchase := make(map[string]*storagePurchase)
batch := &pgx.Batch{}

userIdParams := make([]uuid.UUID, 0, len(purchases))
storeParams := make([]api.StoreProvider, 0, len(purchases))
transactionIdParams := make([]string, 0, len(purchases))
productIdParams := make([]string, 0, len(purchases))
purchaseTimeParams := make([]time.Time, 0, len(purchases))
rawResponseParams := make([]string, 0, len(purchases))
environmentParams := make([]api.StoreEnvironment, 0, len(purchases))
refundTimeParams := make([]time.Time, 0, len(purchases))
query := `
INSERT
INTO
purchase
(
user_id,
store,
transaction_id,
product_id,
purchase_time,
raw_response,
environment,
refund_time
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
INSERT INTO purchase
(
user_id,
store,
transaction_id,
product_id,
purchase_time,
raw_response,
environment,
refund_time
)
SELECT unnest($1::uuid[]), unnest($2::smallint[]), unnest($3::text[]), unnest($4::text[]), unnest($5::timestamptz[]), unnest($6::jsonb[]), unnest($7::smallint[]), unnest($8::timestamptz[])
ON CONFLICT
(transaction_id)
DO UPDATE SET
refund_time = $8,
refund_time = EXCLUDED.refund_time,
update_time = now()
RETURNING
user_id,
Expand All @@ -596,35 +601,71 @@ RETURNING
purchase.rawResponse = "{}"
}
transactionIDsToPurchase[purchase.transactionId] = purchase
batch.Queue(query, purchase.userID, purchase.store, purchase.transactionId, purchase.productId, purchase.purchaseTime, purchase.rawResponse, purchase.environment, purchase.refundTime)
}

if err := ExecuteInTxPgx(ctx, db, func(tx pgx.Tx) error {
br := tx.SendBatch(ctx, batch)
defer br.Close()
for range purchases {
// Newly inserted purchases
var dbUserID uuid.UUID
var transactionId string
var createTime pgtype.Timestamptz
var updateTime pgtype.Timestamptz
var refundTime pgtype.Timestamptz
if err := br.QueryRow().Scan(&dbUserID, &transactionId, &createTime, &updateTime, &refundTime); err != nil {
return err
}
storedPurchase := transactionIDsToPurchase[transactionId]
storedPurchase.createTime = createTime.Time
storedPurchase.updateTime = updateTime.Time
storedPurchase.seenBefore = updateTime.Time.After(createTime.Time)
if refundTime.Time.Unix() != 0 {
storedPurchase.refundTime = refundTime.Time
}
//batch.Queue(query, purchase.userID, purchase.store, purchase.transactionId, purchase.productId, purchase.purchaseTime, purchase.rawResponse, purchase.environment, purchase.refundTime)
userIdParams = append(userIdParams, purchase.userID)
storeParams = append(storeParams, purchase.store)
transactionIdParams = append(transactionIdParams, purchase.transactionId)
productIdParams = append(productIdParams, purchase.productId)
purchaseTimeParams = append(purchaseTimeParams, purchase.purchaseTime)
rawResponseParams = append(rawResponseParams, purchase.rawResponse)
environmentParams = append(environmentParams, purchase.environment)
refundTimeParams = append(refundTimeParams, purchase.refundTime)
}

rows, err := db.QueryContext(ctx, query, userIdParams, storeParams, transactionIdParams, productIdParams, purchaseTimeParams, rawResponseParams, environmentParams, refundTimeParams)
if err != nil {
return nil, err
}
for rows.Next() {
// Newly inserted purchases
var dbUserID uuid.UUID
var transactionId string
var createTime pgtype.Timestamptz
var updateTime pgtype.Timestamptz
var refundTime pgtype.Timestamptz
if err = rows.Scan(&dbUserID, &transactionId, &createTime, &updateTime, &refundTime); err != nil {
_ = rows.Close()
return nil, err
}
return br.Close()
}); err != nil {
storedPurchase := transactionIDsToPurchase[transactionId]
storedPurchase.createTime = createTime.Time
storedPurchase.updateTime = updateTime.Time
storedPurchase.seenBefore = updateTime.Time.After(createTime.Time)
if refundTime.Time.Unix() != 0 {
storedPurchase.refundTime = refundTime.Time
}
}
_ = rows.Close()
if err := rows.Err(); err != nil {
return nil, err
}

//if err := ExecuteInTxPgx(ctx, db, func(tx pgx.Tx) error {
// br := tx.SendBatch(ctx, batch)
// defer br.Close()
// for range purchases {
// // Newly inserted purchases
// var dbUserID uuid.UUID
// var transactionId string
// var createTime pgtype.Timestamptz
// var updateTime pgtype.Timestamptz
// var refundTime pgtype.Timestamptz
// if err := br.QueryRow().Scan(&dbUserID, &transactionId, &createTime, &updateTime, &refundTime); err != nil {
// return err
// }
// storedPurchase := transactionIDsToPurchase[transactionId]
// storedPurchase.createTime = createTime.Time
// storedPurchase.updateTime = updateTime.Time
// storedPurchase.seenBefore = updateTime.Time.After(createTime.Time)
// if refundTime.Time.Unix() != 0 {
// storedPurchase.refundTime = refundTime.Time
// }
// }
// return br.Close()
//}); err != nil {
// return nil, err
//}

storedPurchases := make([]*storagePurchase, 0, len(transactionIDsToPurchase))
for _, purchase := range transactionIDsToPurchase {
storedPurchases = append(storedPurchases, purchase)
Expand Down
38 changes: 25 additions & 13 deletions server/core_wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,17 @@ func updateWallets(ctx context.Context, logger *zap.Logger, tx pgx.Tx, updates [
// Prepare the set of wallet updates and ledger updates.
updatedWallets := make(map[string][]byte, len(updates))
updateOrder := make([]string, 0, len(updates))
batchLedger := &pgx.Batch{}
queryLedger := "INSERT INTO wallet_ledger (id, user_id, changeset, metadata) VALUES ($1, $2, $3, $4)"

var idParams []uuid.UUID
var userIdParams []string
var changesetParams [][]byte
var metadataParams []string
if updateLedger {
idParams = make([]uuid.UUID, 0, len(updates))
userIdParams = make([]string, 0, len(updates))
changesetParams = make([][]byte, 0, len(updates))
metadataParams = make([]string, 0, len(updates))
}

// Go through the changesets and attempt to calculate the new state for each wallet.
for _, update := range updates {
Expand Down Expand Up @@ -208,7 +217,10 @@ func updateWallets(ctx context.Context, logger *zap.Logger, tx pgx.Tx, updates [
return nil, err
}

batchLedger.Queue(queryLedger, uuid.Must(uuid.NewV4()), userID, changesetData, update.Metadata)
idParams = append(idParams, uuid.Must(uuid.NewV4()))
userIdParams = append(userIdParams, userID)
changesetParams = append(changesetParams, changesetData)
metadataParams = append(metadataParams, update.Metadata)
}
}

Expand All @@ -232,17 +244,17 @@ func updateWallets(ctx context.Context, logger *zap.Logger, tx pgx.Tx, updates [
}

// Write the ledger updates, if any.
if updateLedger && (batchLedger.Len() > 0) {
br := tx.SendBatch(ctx, batchLedger)
defer br.Close()
for range updates {
_, err := br.Exec()
if err != nil {
logger.Debug("Error writing user wallet ledgers.", zap.Error(err))
return nil, err
}
if updateLedger && (len(idParams) > 0) {
_, err = tx.Exec(ctx, `
INSERT INTO
wallet_ledger (id, user_id, changeset, metadata)
SELECT
unnest($1::uuid[]), unnest($2::uuid[]), unnest($3::jsonb[]), unnest($4::jsonb[]);
`, idParams, userIdParams, changesetParams, metadataParams)
if err != nil {
logger.Debug("Error writing user wallet ledgers.", zap.Error(err))
return nil, err
}
br.Close()
}
}

Expand Down

0 comments on commit 18dc6ad

Please sign in to comment.