Skip to content

Commit

Permalink
Testing and performance check for api with postgresql (#1809)
Browse files Browse the repository at this point in the history
* Fix sql query for operations endpoint

* Fix vaa_id column name to message_id in pipeline

* Fix id on conflict for wh_operation_transactions insertion

* fix wh_governor_config update column chains

* normalize tx_hash when fetch tx details fail (#1807)

* Add source_event and track_id_event columns to source event tracking (#1808)

Add source_event and track_id_event columns to source event tracking in analytics, parser, pipeline and tx-tracker processing
Improve column and table names

---------

Co-authored-by: Agustin Pazos <[email protected]>
  • Loading branch information
ftocal and walker-16 committed Oct 21, 2024
1 parent d571734 commit 1de45fe
Show file tree
Hide file tree
Showing 33 changed files with 221 additions and 197 deletions.
3 changes: 3 additions & 0 deletions analytics/cmd/prices/vaas.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package prices

import (
"context"
"fmt"
"time"

"github.com/shopspring/decimal"
Expand Down Expand Up @@ -139,6 +140,8 @@ func RunVaasPrices(cfg VaasPrices) {
},
transferredToken,
tokenProvider,
"backfiller",
fmt.Sprintf("backfiller-%s", vaa.MessageID()),
); err != nil {
logger.Error("Failed to upsert transfer prices", zap.String("id", v.ID), zap.Error(err))
}
Expand Down
5 changes: 4 additions & 1 deletion analytics/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ func (c *Consumer) Start(ctx context.Context) {
}

// push vaa metrics.
err = c.pushMetric(ctx, &metric.Params{TrackID: event.TrackID, Vaa: vaa, VaaIsSigned: event.VaaIsSigned})
err = c.pushMetric(
ctx,
&metric.Params{Source: event.Source, TrackID: event.TrackID, Vaa: vaa, VaaIsSigned: event.VaaIsSigned},
)
if err != nil {
msg.Failed()
c.metrics.IncUnprocessedMessage(chainID, event.Source, msg.Retry())
Expand Down
2 changes: 1 addition & 1 deletion analytics/http/vaa/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (c *Controller) PushVAAMetrics(ctx *fiber.Ctx) error {
}

trackID := fmt.Sprintf("controller-%s", vaa.MessageID())
err = c.pushMetric(ctx.Context(), &metric.Params{TrackID: trackID, Vaa: vaa})
err = c.pushMetric(ctx.Context(), &metric.Params{Source: "controller", TrackID: trackID, Vaa: vaa})
if err != nil {
c.logger.Error("Error pushing metric", zap.Error(err))
return err
Expand Down
2 changes: 2 additions & 0 deletions analytics/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ func (m *Metric) Push(ctx context.Context, params *Params) error {
},
transferredToken.Clone(),
m.tokenProvider,
params.Source,
params.TrackID,
)

} else {
Expand Down
3 changes: 3 additions & 0 deletions analytics/metric/prices.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func UpsertTransferPrices(
tokenPriceFunc func(tokenID, coinGeckoID string, timestamp time.Time) (decimal.Decimal, error),
transferredToken *token.TransferredToken,
tokenProvider *domain.TokenProvider,
source, trackID string,
) error {

// Do not generate this metric for PythNet VAAs
Expand Down Expand Up @@ -69,6 +70,8 @@ func UpsertTransferPrices(
usdAmount := tokenAmount.Mul(notionalUSD)

tp := storage.OperationPrice{
Source: source,
TrackID: trackID,
Digest: utils.NormalizeHex(vaa.HexDigest()),
VaaID: vaa.MessageID(),
ChainID: vaa.EmitterChain,
Expand Down
1 change: 1 addition & 0 deletions analytics/metric/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
)

type Params struct {
Source string
TrackID string
Vaa *vaa.VAA
VaaIsSigned bool
Expand Down
8 changes: 5 additions & 3 deletions analytics/storage/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func NewPostgresRepository(db *db.DB, metrics metrics.Metrics, logger *zap.Logge
func (r *PostgresPricesRepository) Upsert(ctx context.Context, op OperationPrice) error {
query := `
INSERT INTO wormholescan.wh_operation_prices
(id, message_id, token_chain_id, token_address, coingecko_id, symbol, token_usd_price, total_token, total_usd, "timestamp", created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
(id, message_id, token_chain_id, token_address, coingecko_id, symbol, token_usd_price, total_token, total_usd, "timestamp", created_at, updated_at, source_event, track_id_event)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
ON CONFLICT(id) DO UPDATE
SET updated_at = $12
RETURNING updated_at;
Expand All @@ -53,7 +53,9 @@ func (r *PostgresPricesRepository) Upsert(ctx context.Context, op OperationPrice
op.TotalUSD,
op.Timestamp,
now,
now)
now,
op.Source,
op.TrackID)

if err == nil {
r.metrics.IncOperationPriceInserted(op.ChainID)
Expand Down
2 changes: 2 additions & 0 deletions analytics/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type PricesRepository interface {
}

type OperationPrice struct {
Source string
TrackID string
Digest string
VaaID string
ChainID sdk.ChainID
Expand Down
2 changes: 1 addition & 1 deletion api/handlers/observations/postgres_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (q *ObservationQuery) toQuery() (string, []any) {

query := "SELECT obs.* FROM wormholescan.wh_observations obs \n"
if hasTxHash {
query += "JOIN wormholescan.wh_operation_transactions ot ON obs.hash = ot.attestation_vaas_id \n"
query += "JOIN wormholescan.wh_operation_transactions ot ON obs.hash = ot.attestation_id \n"
}

query += fmt.Sprintf("WHERE %s \n", where)
Expand Down
2 changes: 1 addition & 1 deletion api/handlers/observations/postgres_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestPostgresRepository_toQueryWithTxHash(t *testing.T) {
assert.Equal(t, 1, len(params))
assert.Equal(t,
`SELECT obs.* FROM wormholescan.wh_observations obs
JOIN wormholescan.wh_operation_transactions ot ON obs.hash = ot.attestation_vaas_id
JOIN wormholescan.wh_operation_transactions ot ON obs.hash = ot.attestation_id
WHERE ot.tx_hash = $1
ORDER BY obs.created_at DESC
LIMIT 50 OFFSET 0`, query)
Expand Down
29 changes: 16 additions & 13 deletions api/handlers/operations/postgres_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

const baseQuery = `
SELECT
wot.attestation_vaas_id as transaction_attestation_vaas_id,
wot.attestation_id as transaction_attestation_vaas_id,
wot.message_id as transaction_message_id,
wot.chain_id as transaction_chain_id,
wot.tx_hash as transaction_tx_hash,
Expand Down Expand Up @@ -47,9 +47,9 @@ SELECT
wavp.payload as properties_payload,
wavp.raw_standard_fields as properties_raw_standard_fields
FROM wormholescan.wh_operation_transactions wot
LEFT JOIN wormholescan.wh_attestation_vaas wav ON wav.id = wot.attestation_vaas_id
LEFT JOIN wormholescan.wh_operation_prices wop ON wop.id = wot.attestation_vaas_id
LEFT JOIN wormholescan.wh_attestation_vaa_properties wavp ON wavp.id = wot.attestation_vaas_id
LEFT JOIN wormholescan.wh_attestation_vaas wav ON wav.id = wot.attestation_id
LEFT JOIN wormholescan.wh_operation_prices wop ON wop.id = wot.attestation_id
LEFT JOIN wormholescan.wh_operation_properties wavp ON wavp.id = wot.attestation_id
`

type operationResult struct {
Expand Down Expand Up @@ -497,7 +497,7 @@ func (r *PostgresRepository) toPrices(op *operationResult) *operationPrices {

func (r *PostgresRepository) findOperationByIDs(ctx context.Context, ids []string) ([]*operationResult, error) {
var ops []*operationResult
query := baseQuery + `WHERE wot.attestation_vaas_id = ANY($1)`
query := baseQuery + `WHERE wot.attestation_id = ANY($1)`
err := r.db.Select(ctx, &ops, query, pq.Array(ids))
if err != nil {
return nil, err
Expand All @@ -523,9 +523,9 @@ func (r *PostgresRepository) buildQueryIDsForTxHash(txHash string, from, to *tim
timestampCondition = " AND " + strings.Join(conditions, " AND ")
}
query := fmt.Sprintf(`
SELECT t.attestation_vaas_id FROM wormholescan.wh_operation_transactions t
SELECT t.attestation_id FROM wormholescan.wh_operation_transactions t
WHERE t.tx_hash = $1 %s
ORDER BY t.timestamp %s, t.attestation_vaas_id %s
ORDER BY t.timestamp %s, t.attestation_id %s
LIMIT $2 OFFSET $3`, timestampCondition, sort, sort)
return query, params
}
Expand All @@ -551,8 +551,8 @@ func (r *PostgresRepository) buildQueryIDsForAddress(address string, from, to *t
query := fmt.Sprintf(`
SELECT oa.id FROM wormholescan.wh_operation_addresses oa
WHERE oa.address = $1 AND exists (
SELECT ot.attestation_vaas_id FROM wormholescan.wh_operation_transactions ot
WHERE ot.attestation_vaas_id = oa.id
SELECT ot.attestation_id FROM wormholescan.wh_operation_transactions ot
WHERE ot.attestation_id = oa.id
) %s
ORDER BY oa."timestamp" %s, oa.id %s
LIMIT $2 OFFSET $3`, timestampCondition, sort, sort)
Expand Down Expand Up @@ -611,9 +611,9 @@ func (r *PostgresRepository) buildQueryIDsForQuery(query OperationQuery, paginat
timestampCondition = " AND " + strings.Join(conditions, " AND ")
}
querySql := fmt.Sprintf(`
SELECT op.attestation_vaas_id FROM wormholescan.wh_operation_transactions op
SELECT op.attestation_id FROM wormholescan.wh_operation_transactions op
WHERE op."type" = 'source-tx' %s
ORDER BY op."timestamp" %s, op.attestation_vaas_id %s
ORDER BY op."timestamp" %s, op.attestation_id %s
LIMIT $1 OFFSET $2`, timestampCondition, sort, sort)
return querySql, params
}
Expand All @@ -630,8 +630,11 @@ func (r *PostgresRepository) buildQueryIDsForQuery(query OperationQuery, paginat

condition := strings.Join(conditions, " AND ")
querySql := fmt.Sprintf(`
SELECT p.id FROM wormholescan.wh_attestation_vaa_properties p
WHERE %s
SELECT p.id FROM wormholescan.wh_operation_properties p
WHERE exists (
SELECT ot.attestation_id FROM wormholescan.wh_operation_transactions ot
WHERE ot.attestation_id = p.id
) AND %s
ORDER BY p.timestamp %s, p.id %s
LIMIT $%d OFFSET $%d`, condition, sort, sort, len(params)+1, len(params)+2)
params = append(params, pagination.Limit, pagination.Skip)
Expand Down
46 changes: 26 additions & 20 deletions api/handlers/operations/postgres_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ func Test_buildQueryIDsForAddress(t *testing.T) {
expectedQuery := `
SELECT oa.id FROM wormholescan.wh_operation_addresses oa
WHERE oa.address = $1 AND exists (
SELECT ot.attestation_vaas_id FROM wormholescan.wh_operation_transactions ot
WHERE ot.attestation_vaas_id = oa.id
SELECT ot.attestation_id FROM wormholescan.wh_operation_transactions ot
WHERE ot.attestation_id = oa.id
)
ORDER BY oa."timestamp" DESC, oa.id DESC
LIMIT $2 OFFSET $3`
Expand All @@ -43,8 +43,8 @@ func Test_buildQueryIDsForAddressWithFromAndTo(t *testing.T) {
expectedQuery := `
SELECT oa.id FROM wormholescan.wh_operation_addresses oa
WHERE oa.address = $1 AND exists (
SELECT ot.attestation_vaas_id FROM wormholescan.wh_operation_transactions ot
WHERE ot.attestation_vaas_id = oa.id
SELECT ot.attestation_id FROM wormholescan.wh_operation_transactions ot
WHERE ot.attestation_id = oa.id
) AND oa."timestamp" >= $4 AND oa."timestamp" <= $5
ORDER BY oa."timestamp" DESC, oa.id DESC
LIMIT $2 OFFSET $3`
Expand All @@ -70,8 +70,8 @@ func Test_buildQueryIDsForAddressWithTo(t *testing.T) {
expectedQuery := `
SELECT oa.id FROM wormholescan.wh_operation_addresses oa
WHERE oa.address = $1 AND exists (
SELECT ot.attestation_vaas_id FROM wormholescan.wh_operation_transactions ot
WHERE ot.attestation_vaas_id = oa.id
SELECT ot.attestation_id FROM wormholescan.wh_operation_transactions ot
WHERE ot.attestation_id = oa.id
) AND oa."timestamp" <= $4
ORDER BY oa."timestamp" DESC, oa.id DESC
LIMIT $2 OFFSET $3`
Expand All @@ -93,9 +93,9 @@ func Test_buildQueryIDsForTxHash(t *testing.T) {
pagination := pagination.Default()

expectedQuery := `
SELECT t.attestation_vaas_id FROM wormholescan.wh_operation_transactions t
SELECT t.attestation_id FROM wormholescan.wh_operation_transactions t
WHERE t.tx_hash = $1
ORDER BY t.timestamp DESC, t.attestation_vaas_id DESC
ORDER BY t.timestamp DESC, t.attestation_id DESC
LIMIT $2 OFFSET $3`

query, params := repo.buildQueryIDsForTxHash("0x87ebf4ae9a729855e491557270dc3e69da04092e6f6ca0025b2f88a2c1ea9be6", nil, nil, *pagination)
Expand All @@ -115,9 +115,9 @@ func Test_buildQueryIDsForTxHashWithFromAndTo(t *testing.T) {
to := time.Date(2024, 7, 6, 0, 36, 0, 0, time.UTC)

expectedQuery := `
SELECT t.attestation_vaas_id FROM wormholescan.wh_operation_transactions t
SELECT t.attestation_id FROM wormholescan.wh_operation_transactions t
WHERE t.tx_hash = $1 AND t."timestamp" >= $4 AND t."timestamp" <= $5
ORDER BY t.timestamp DESC, t.attestation_vaas_id DESC
ORDER BY t.timestamp DESC, t.attestation_id DESC
LIMIT $2 OFFSET $3`

query, params := repo.buildQueryIDsForTxHash("0x87ebf4ae9a729855e491557270dc3e69da04092e6f6ca0025b2f88a2c1ea9be6", &from, &to, *pagination)
Expand All @@ -136,9 +136,9 @@ func Test_buildQueryIDsForQuery(t *testing.T) {
pagination := pagination.Default()

expectedQuery := `
SELECT op.attestation_vaas_id FROM wormholescan.wh_operation_transactions op
SELECT op.attestation_id FROM wormholescan.wh_operation_transactions op
WHERE op."type" = 'source-tx'
ORDER BY op."timestamp" DESC, op.attestation_vaas_id DESC
ORDER BY op."timestamp" DESC, op.attestation_id DESC
LIMIT $1 OFFSET $2`

query, params := repo.buildQueryIDsForQuery(OperationQuery{}, *pagination)
Expand All @@ -157,9 +157,9 @@ func Test_buildQueryIDsForQueryWithFromAndTo(t *testing.T) {
q := OperationQuery{From: &from, To: &to}

expectedQuery := `
SELECT op.attestation_vaas_id FROM wormholescan.wh_operation_transactions op
SELECT op.attestation_id FROM wormholescan.wh_operation_transactions op
WHERE op."type" = 'source-tx' AND op."timestamp" >= $3 AND op."timestamp" <= $4
ORDER BY op."timestamp" DESC, op.attestation_vaas_id DESC
ORDER BY op."timestamp" DESC, op.attestation_id DESC
LIMIT $1 OFFSET $2`

query, params := repo.buildQueryIDsForQuery(q, *pagination)
Expand All @@ -179,9 +179,9 @@ func Test_buildQueryIDsForQueryWithFrom(t *testing.T) {
q := OperationQuery{From: &from}

expectedQuery := `
SELECT op.attestation_vaas_id FROM wormholescan.wh_operation_transactions op
SELECT op.attestation_id FROM wormholescan.wh_operation_transactions op
WHERE op."type" = 'source-tx' AND op."timestamp" >= $3
ORDER BY op."timestamp" DESC, op.attestation_vaas_id DESC
ORDER BY op."timestamp" DESC, op.attestation_id DESC
LIMIT $1 OFFSET $2`

query, params := repo.buildQueryIDsForQuery(q, *pagination)
Expand All @@ -199,8 +199,11 @@ func Test_buildQueryIDsForQueryWithTargetChains(t *testing.T) {
q := OperationQuery{TargetChainIDs: []sdk.ChainID{sdk.ChainIDSolana, sdk.ChainIDEthereum}}

expectedQuery := `
SELECT p.id FROM wormholescan.wh_attestation_vaa_properties p
WHERE p.to_chain_id = ANY($1)
SELECT p.id FROM wormholescan.wh_operation_properties p
WHERE exists (
SELECT ot.attestation_id FROM wormholescan.wh_operation_transactions ot
WHERE ot.attestation_id = p.id
) AND p.to_chain_id = ANY($1)
ORDER BY p.timestamp DESC, p.id DESC
LIMIT $2 OFFSET $3`

Expand All @@ -220,8 +223,11 @@ func Test_buildQueryIDsForQueryWithTargetChainsAndFrom(t *testing.T) {
q := OperationQuery{From: &from, SourceChainIDs: []sdk.ChainID{sdk.ChainIDAptos}}

expectedQuery := `
SELECT p.id FROM wormholescan.wh_attestation_vaa_properties p
WHERE p.from_chain_id = ANY($1) AND p."timestamp" >= $2
SELECT p.id FROM wormholescan.wh_operation_properties p
WHERE exists (
SELECT ot.attestation_id FROM wormholescan.wh_operation_transactions ot
WHERE ot.attestation_id = p.id
) AND p.from_chain_id = ANY($1) AND p."timestamp" >= $2
ORDER BY p.timestamp DESC, p.id DESC
LIMIT $3 OFFSET $4`

Expand Down
10 changes: 5 additions & 5 deletions api/handlers/transactions/postgres_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ SELECT
wavp.payload as properties_payload,
wavp.raw_standard_fields as properties_raw_standard_fields
FROM wormholescan.wh_attestation_vaas wav
LEFT JOIN wormholescan.wh_operation_transactions wot ON wav.id = wot.attestation_vaas_id
LEFT JOIN wormholescan.wh_operation_prices wop ON wop.id = wot.attestation_vaas_id
LEFT JOIN wormholescan.wh_attestation_vaa_properties wavp ON wavp.id = wot.attestation_vaas_id
LEFT JOIN wormholescan.wh_operation_transactions wot ON wav.id = wot.attestation_id
LEFT JOIN wormholescan.wh_operation_prices wop ON wop.id = wot.attestation_id
LEFT JOIN wormholescan.wh_operation_properties wavp ON wavp.id = wot.attestation_id
`

type totalPythResult struct {
Expand Down Expand Up @@ -100,7 +100,7 @@ type operationTxResult struct {
Type string `db:"type"`
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
AttestationVaasID string `db:"attestation_vaas_id"`
AttestationVaasID string `db:"attestation_id"`
MessageID string `db:"message_id"`
Status *string `db:"status"`
FromAddress *string `db:"from_address"`
Expand All @@ -118,7 +118,7 @@ func (r *PostgresRepository) FindGlobalTransactionByID(
q *GlobalTransactionQuery,
) (*GlobalTransactionDoc, error) {

query := `SELECT chain_id, tx_hash, "type", created_at, updated_at, attestation_vaas_id, message_id, status,
query := `SELECT chain_id, tx_hash, "type", created_at, updated_at, attestation_id, message_id, status,
from_address, to_address, block_number, blockchain_method, fee_detail, timestamp, rpc_response
FROM wormholescan.wh_operation_transactions
WHERE message_id = $1
Expand Down
Loading

0 comments on commit 1de45fe

Please sign in to comment.