Skip to content

Commit

Permalink
Replace INSERT batch with pgx.Batch with single row insert
Browse files Browse the repository at this point in the history
  • Loading branch information
ftkg committed Oct 23, 2023
1 parent dce82f8 commit 6c376dd
Showing 1 changed file with 36 additions and 44 deletions.
80 changes: 36 additions & 44 deletions server/core_purchase.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import (
"fmt"
"net/http"
"strconv"
"strings"
"time"

"github.com/gofrs/uuid/v5"
"github.com/heroiclabs/nakama-common/api"
"github.com/heroiclabs/nakama/v3/iap"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -558,25 +558,8 @@ func upsertPurchases(ctx context.Context, db *sql.DB, purchases []*storagePurcha
return nil, errors.New("expects at least one receipt")
}

statements := make([]string, 0, len(purchases))
params := make([]interface{}, 0, len(purchases)*8)
transactionIDsToPurchase := make(map[string]*storagePurchase)
offset := 0
for _, purchase := range purchases {
if purchase.refundTime.IsZero() {
purchase.refundTime = time.Unix(0, 0)
}
if purchase.rawResponse == "" {
purchase.rawResponse = "{}"
}

statement := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", offset+1, offset+2, offset+3, offset+4, offset+5, offset+6, offset+7, offset+8)
offset += 8
statements = append(statements, statement)
params = append(params, purchase.userID, purchase.store, purchase.transactionId, purchase.productId, purchase.purchaseTime, purchase.rawResponse, purchase.environment, purchase.refundTime)
transactionIDsToPurchase[purchase.transactionId] = purchase
}

batch := &pgx.Batch{}
query := `
INSERT
INTO
Expand All @@ -591,8 +574,7 @@ INTO
environment,
refund_time
)
VALUES
` + strings.Join(statements, ", ") + `
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT
(transaction_id)
DO UPDATE SET
Expand All @@ -605,31 +587,41 @@ RETURNING
update_time,
refund_time
`
rows, err := db.QueryContext(ctx, query, params...)
if err != nil {
return nil, err
}
for rows.Next() {
// Newly inserted purchases
var dbUserID uuid.UUID
var transactionId string
var createTime pgtype.Timestamptz
var updateTime pgtype.Timestamptz
var refundTime pgtype.Timestamptz
if err = rows.Scan(&dbUserID, &transactionId, &createTime, &updateTime, &refundTime); err != nil {
_ = rows.Close()
return nil, err

for _, purchase := range purchases {
if purchase.refundTime.IsZero() {
purchase.refundTime = time.Unix(0, 0)
}
storedPurchase := transactionIDsToPurchase[transactionId]
storedPurchase.createTime = createTime.Time
storedPurchase.updateTime = updateTime.Time
storedPurchase.seenBefore = updateTime.Time.After(createTime.Time)
if refundTime.Time.Unix() != 0 {
storedPurchase.refundTime = refundTime.Time
if purchase.rawResponse == "" {
purchase.rawResponse = "{}"
}
}
_ = rows.Close()
if err := rows.Err(); err != nil {
transactionIDsToPurchase[purchase.transactionId] = purchase
batch.Queue(query, purchase.userID, purchase.store, purchase.transactionId, purchase.productId, purchase.purchaseTime, purchase.rawResponse, purchase.environment, purchase.refundTime)
}

if err := ExecuteInTxPgx(ctx, db, func(tx pgx.Tx) error {
br := tx.SendBatch(ctx, batch)
defer br.Close()
for range purchases {
// Newly inserted purchases
var dbUserID uuid.UUID
var transactionId string
var createTime pgtype.Timestamptz
var updateTime pgtype.Timestamptz
var refundTime pgtype.Timestamptz
if err := br.QueryRow().Scan(&dbUserID, &transactionId, &createTime, &updateTime, &refundTime); err != nil {
return err
}
storedPurchase := transactionIDsToPurchase[transactionId]
storedPurchase.createTime = createTime.Time
storedPurchase.updateTime = updateTime.Time
storedPurchase.seenBefore = updateTime.Time.After(createTime.Time)
if refundTime.Time.Unix() != 0 {
storedPurchase.refundTime = refundTime.Time
}
}
return br.Close()
}); err != nil {
return nil, err
}

Expand Down

0 comments on commit 6c376dd

Please sign in to comment.