From 18dc6ad47b47ef9164b39234ce789571c3595706 Mon Sep 17 00:00:00 2001 From: Fernando Takagi Date: Thu, 26 Oct 2023 11:12:19 -0300 Subject: [PATCH] Replace batch with single unnest batch. --- server/core_notification.go | 41 +++++++----- server/core_purchase.go | 125 ++++++++++++++++++++++++------------ server/core_wallet.go | 38 +++++++---- 3 files changed, 132 insertions(+), 72 deletions(-) diff --git a/server/core_notification.go b/server/core_notification.go index 914c22ef65..cddafba179 100644 --- a/server/core_notification.go +++ b/server/core_notification.go @@ -27,7 +27,6 @@ import ( "github.com/heroiclabs/nakama-common/api" "github.com/heroiclabs/nakama-common/rtapi" "github.com/jackc/pgtype" - "github.com/jackc/pgx/v4" "go.uber.org/zap" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -274,27 +273,35 @@ func NotificationDelete(ctx context.Context, logger *zap.Logger, db *sql.DB, use } func NotificationSave(ctx context.Context, logger *zap.Logger, db *sql.DB, notifications map[uuid.UUID][]*api.Notification) error { - batch := &pgx.Batch{} - query := "INSERT INTO notification (id, user_id, subject, content, code, sender_id) VALUES ($1, $2, $3, $4, $5, $6)" + ids := make([]string, 0, len(notifications)) + userIds := make([]uuid.UUID, 0, len(notifications)) + subjects := make([]string, 0, len(notifications)) + contents := make([]string, 0, len(notifications)) + codes := make([]int32, 0, len(notifications)) + senderIds := make([]string, 0, len(notifications)) + query := ` +INSERT INTO + notification (id, user_id, subject, content, code, sender_id) +SELECT + unnest($1::uuid[]), + unnest($2::uuid[]), + unnest($3::text[]), + unnest($4::jsonb[]), + unnest($5::smallint[]), + unnest($6::uuid[]); +` for userID, no := range notifications { for _, un := range no { - batch.Queue(query, un.Id, userID, un.Subject, un.Content, un.Code, un.SenderId) + ids = append(ids, un.Id) + userIds = append(userIds, userID) + subjects = append(subjects, un.Subject) + contents = append(contents, un.Content) + codes = append(codes, un.Code) + senderIds = append(senderIds, un.SenderId) } } - if err := ExecuteInTxPgx(ctx, db, func(tx pgx.Tx) error { - br := tx.SendBatch(ctx, batch) - defer br.Close() - for _, no := range notifications { - for range no { - _, err := br.Exec() - if err != nil { - return err - } - } - } - return br.Close() - }); err != nil { + if _, err := db.ExecContext(ctx, query, ids, userIds, subjects, contents, codes, senderIds); err != nil { logger.Error("Could not save notifications.", zap.Error(err)) return err } diff --git a/server/core_purchase.go b/server/core_purchase.go index 4035ccae10..b93af162fe 100644 --- a/server/core_purchase.go +++ b/server/core_purchase.go @@ -30,7 +30,6 @@ import ( "github.com/heroiclabs/nakama-common/api" "github.com/heroiclabs/nakama/v3/iap" "github.com/jackc/pgtype" - "github.com/jackc/pgx/v4" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -559,26 +558,32 @@ func upsertPurchases(ctx context.Context, db *sql.DB, purchases []*storagePurcha } transactionIDsToPurchase := make(map[string]*storagePurchase) - batch := &pgx.Batch{} + + userIdParams := make([]uuid.UUID, 0, len(purchases)) + storeParams := make([]api.StoreProvider, 0, len(purchases)) + transactionIdParams := make([]string, 0, len(purchases)) + productIdParams := make([]string, 0, len(purchases)) + purchaseTimeParams := make([]time.Time, 0, len(purchases)) + rawResponseParams := make([]string, 0, len(purchases)) + environmentParams := make([]api.StoreEnvironment, 0, len(purchases)) + refundTimeParams := make([]time.Time, 0, len(purchases)) query := ` -INSERT -INTO - purchase - ( - user_id, - store, - transaction_id, - product_id, - purchase_time, - raw_response, - environment, - refund_time - ) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +INSERT INTO purchase + ( + user_id, + store, + transaction_id, + product_id, + purchase_time, + raw_response, + environment, + refund_time + ) +SELECT unnest($1::uuid[]), unnest($2::smallint[]), unnest($3::text[]), unnest($4::text[]), unnest($5::timestamptz[]), unnest($6::jsonb[]), unnest($7::smallint[]), unnest($8::timestamptz[]) ON CONFLICT (transaction_id) DO UPDATE SET - refund_time = $8, + refund_time = EXCLUDED.refund_time, update_time = now() RETURNING user_id, @@ -596,35 +601,71 @@ RETURNING purchase.rawResponse = "{}" } transactionIDsToPurchase[purchase.transactionId] = purchase - batch.Queue(query, purchase.userID, purchase.store, purchase.transactionId, purchase.productId, purchase.purchaseTime, purchase.rawResponse, purchase.environment, purchase.refundTime) - } - - if err := ExecuteInTxPgx(ctx, db, func(tx pgx.Tx) error { - br := tx.SendBatch(ctx, batch) - defer br.Close() - for range purchases { - // Newly inserted purchases - var dbUserID uuid.UUID - var transactionId string - var createTime pgtype.Timestamptz - var updateTime pgtype.Timestamptz - var refundTime pgtype.Timestamptz - if err := br.QueryRow().Scan(&dbUserID, &transactionId, &createTime, &updateTime, &refundTime); err != nil { - return err - } - storedPurchase := transactionIDsToPurchase[transactionId] - storedPurchase.createTime = createTime.Time - storedPurchase.updateTime = updateTime.Time - storedPurchase.seenBefore = updateTime.Time.After(createTime.Time) - if refundTime.Time.Unix() != 0 { - storedPurchase.refundTime = refundTime.Time - } + //batch.Queue(query, purchase.userID, purchase.store, purchase.transactionId, purchase.productId, purchase.purchaseTime, purchase.rawResponse, purchase.environment, purchase.refundTime) + userIdParams = append(userIdParams, purchase.userID) + storeParams = append(storeParams, purchase.store) + transactionIdParams = append(transactionIdParams, purchase.transactionId) + productIdParams = append(productIdParams, purchase.productId) + purchaseTimeParams = append(purchaseTimeParams, purchase.purchaseTime) + rawResponseParams = append(rawResponseParams, purchase.rawResponse) + environmentParams = append(environmentParams, purchase.environment) + refundTimeParams = append(refundTimeParams, purchase.refundTime) + } + + rows, err := db.QueryContext(ctx, query, userIdParams, storeParams, transactionIdParams, productIdParams, purchaseTimeParams, rawResponseParams, environmentParams, refundTimeParams) + if err != nil { + return nil, err + } + for rows.Next() { + // Newly inserted purchases + var dbUserID uuid.UUID + var transactionId string + var createTime pgtype.Timestamptz + var updateTime pgtype.Timestamptz + var refundTime pgtype.Timestamptz + if err = rows.Scan(&dbUserID, &transactionId, &createTime, &updateTime, &refundTime); err != nil { + _ = rows.Close() + return nil, err } - return br.Close() - }); err != nil { + storedPurchase := transactionIDsToPurchase[transactionId] + storedPurchase.createTime = createTime.Time + storedPurchase.updateTime = updateTime.Time + storedPurchase.seenBefore = updateTime.Time.After(createTime.Time) + if refundTime.Time.Unix() != 0 { + storedPurchase.refundTime = refundTime.Time + } + } + _ = rows.Close() + if err := rows.Err(); err != nil { return nil, err } + //if err := ExecuteInTxPgx(ctx, db, func(tx pgx.Tx) error { + // br := tx.SendBatch(ctx, batch) + // defer br.Close() + // for range purchases { + // // Newly inserted purchases + // var dbUserID uuid.UUID + // var transactionId string + // var createTime pgtype.Timestamptz + // var updateTime pgtype.Timestamptz + // var refundTime pgtype.Timestamptz + // if err := br.QueryRow().Scan(&dbUserID, &transactionId, &createTime, &updateTime, &refundTime); err != nil { + // return err + // } + // storedPurchase := transactionIDsToPurchase[transactionId] + // storedPurchase.createTime = createTime.Time + // storedPurchase.updateTime = updateTime.Time + // storedPurchase.seenBefore = updateTime.Time.After(createTime.Time) + // if refundTime.Time.Unix() != 0 { + // storedPurchase.refundTime = refundTime.Time + // } + // } + // return br.Close() + //}); err != nil { + // return nil, err + //} + storedPurchases := make([]*storagePurchase, 0, len(transactionIDsToPurchase)) for _, purchase := range transactionIDsToPurchase { storedPurchases = append(storedPurchases, purchase) diff --git a/server/core_wallet.go b/server/core_wallet.go index c474d4932c..9ffd8fda60 100644 --- a/server/core_wallet.go +++ b/server/core_wallet.go @@ -155,8 +155,17 @@ func updateWallets(ctx context.Context, logger *zap.Logger, tx pgx.Tx, updates [ // Prepare the set of wallet updates and ledger updates. updatedWallets := make(map[string][]byte, len(updates)) updateOrder := make([]string, 0, len(updates)) - batchLedger := &pgx.Batch{} - queryLedger := "INSERT INTO wallet_ledger (id, user_id, changeset, metadata) VALUES ($1, $2, $3, $4)" + + var idParams []uuid.UUID + var userIdParams []string + var changesetParams [][]byte + var metadataParams []string + if updateLedger { + idParams = make([]uuid.UUID, 0, len(updates)) + userIdParams = make([]string, 0, len(updates)) + changesetParams = make([][]byte, 0, len(updates)) + metadataParams = make([]string, 0, len(updates)) + } // Go through the changesets and attempt to calculate the new state for each wallet. for _, update := range updates { @@ -208,7 +217,10 @@ func updateWallets(ctx context.Context, logger *zap.Logger, tx pgx.Tx, updates [ return nil, err } - batchLedger.Queue(queryLedger, uuid.Must(uuid.NewV4()), userID, changesetData, update.Metadata) + idParams = append(idParams, uuid.Must(uuid.NewV4())) + userIdParams = append(userIdParams, userID) + changesetParams = append(changesetParams, changesetData) + metadataParams = append(metadataParams, update.Metadata) } } @@ -232,17 +244,17 @@ func updateWallets(ctx context.Context, logger *zap.Logger, tx pgx.Tx, updates [ } // Write the ledger updates, if any. - if updateLedger && (batchLedger.Len() > 0) { - br := tx.SendBatch(ctx, batchLedger) - defer br.Close() - for range updates { - _, err := br.Exec() - if err != nil { - logger.Debug("Error writing user wallet ledgers.", zap.Error(err)) - return nil, err - } + if updateLedger && (len(idParams) > 0) { + _, err = tx.Exec(ctx, ` +INSERT INTO + wallet_ledger (id, user_id, changeset, metadata) +SELECT + unnest($1::uuid[]), unnest($2::uuid[]), unnest($3::jsonb[]), unnest($4::jsonb[]); +`, idParams, userIdParams, changesetParams, metadataParams) + if err != nil { + logger.Debug("Error writing user wallet ledgers.", zap.Error(err)) + return nil, err } - br.Close() } }